From 122caec7b187820a0a7fc89e85eb8216dd57df21 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 28 Sep 2018 11:16:35 -0700 Subject: [PATCH] Add support targetCompactionSizeBytes for compactionTask (#6203) * Add support targetCompactionSizeBytes for compactionTask * fix test * fix a bug in keepSegmentGranularity * fix wrong noinspection comment * address comments --- .../timeline/partition/PartitionHolder.java | 7 + .../indexing/common/task/CompactionTask.java | 205 ++++++++-- .../druid/indexing/common/task/IndexTask.java | 184 +++++++-- .../batch/parallel/ParallelIndexSubTask.java | 20 +- .../common/task/CompactionTaskTest.java | 387 ++++++++++++++++-- .../indexing/common/task/TaskSerdeTest.java | 23 +- .../DataSourceCompactionConfig.java | 3 +- 7 files changed, 694 insertions(+), 135 deletions(-) diff --git a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java index 1ed0ae64bca..5e5f676e42f 100644 --- a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java +++ b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java @@ -26,6 +26,7 @@ import com.google.common.collect.Sets; import java.util.Iterator; import java.util.List; import java.util.SortedSet; +import java.util.Spliterator; import java.util.TreeSet; /** @@ -130,6 +131,12 @@ public class PartitionHolder implements Iterable> return holderSet.iterator(); } + @Override + public Spliterator> spliterator() + { + return holderSet.spliterator(); + } + public Iterable payloads() { return Iterables.transform(this, PartitionChunk::getObject); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ea1186aa3a7..cce1a743757 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -68,6 +68,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; @@ -86,6 +87,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Collectors; @@ -101,13 +103,15 @@ public class CompactionTask extends AbstractTask private final List segments; private final DimensionsSpec dimensionsSpec; private final boolean keepSegmentGranularity; + @Nullable + private final Long targetCompactionSizeBytes; + @Nullable private final IndexTuningConfig tuningConfig; private final ObjectMapper jsonMapper; @JsonIgnore private final SegmentProvider segmentProvider; - @JsonIgnore - private List indexTaskSpecs; + private final PartitionConfigurationManager partitionConfigurationManager; @JsonIgnore private final AuthorizerMapper authorizerMapper; @@ -118,6 +122,9 @@ public class CompactionTask extends AbstractTask @JsonIgnore private final RowIngestionMetersFactory rowIngestionMetersFactory; + @JsonIgnore + private List indexTaskSpecs; + @JsonCreator public CompactionTask( @JsonProperty("id") final String id, @@ -127,6 +134,7 @@ public class CompactionTask extends AbstractTask @Nullable @JsonProperty("segments") final List segments, @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec, @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity, + @Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes, @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @Nullable @JsonProperty("context") final Map context, @JacksonInject ObjectMapper jsonMapper, @@ -149,9 +157,11 @@ public class CompactionTask extends AbstractTask this.keepSegmentGranularity = keepSegmentGranularity == null ? DEFAULT_KEEP_SEGMENT_GRANULARITY : keepSegmentGranularity; + this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); + this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; @@ -181,6 +191,14 @@ public class CompactionTask extends AbstractTask return keepSegmentGranularity; } + @Nullable + @JsonProperty + public Long getTargetCompactionSizeBytes() + { + return targetCompactionSizeBytes; + } + + @Nullable @JsonProperty public IndexTuningConfig getTuningConfig() { @@ -220,9 +238,9 @@ public class CompactionTask extends AbstractTask indexTaskSpecs = createIngestionSchema( toolbox, segmentProvider, + partitionConfigurationManager, dimensionsSpec, keepSegmentGranularity, - tuningConfig, jsonMapper ).stream() .map(spec -> new IndexTask( @@ -271,12 +289,12 @@ public class CompactionTask extends AbstractTask */ @VisibleForTesting static List createIngestionSchema( - TaskToolbox toolbox, - SegmentProvider segmentProvider, - DimensionsSpec dimensionsSpec, - boolean keepSegmentGranularity, - IndexTuningConfig tuningConfig, - ObjectMapper jsonMapper + final TaskToolbox toolbox, + final SegmentProvider segmentProvider, + final PartitionConfigurationManager partitionConfigurationManager, + final DimensionsSpec dimensionsSpec, + final boolean keepSegmentGranularity, + final ObjectMapper jsonMapper ) throws IOException, SegmentLoadingException { Pair, List>> pair = prepareSegments( @@ -290,26 +308,52 @@ public class CompactionTask extends AbstractTask return Collections.emptyList(); } + // find metadata for interval + final List> queryableIndexAndSegments = loadSegments( + timelineSegments, + segmentFileMap, + toolbox.getIndexIO() + ); + + final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig( + queryableIndexAndSegments + ); + if (keepSegmentGranularity) { - // if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index + // If keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index // task per segment interval. - final List specs = new ArrayList<>(timelineSegments.size()); - for (TimelineObjectHolder holder : timelineSegments) { + + //noinspection unchecked,ConstantConditions + final Map>> intervalToSegments = queryableIndexAndSegments + .stream() + .collect( + Collectors.toMap( + // rhs can't be null here so we skip null checking and supress the warning with the above comment + p -> p.rhs.getInterval(), + Lists::newArrayList, + (l1, l2) -> { + l1.addAll(l2); + return l1; + } + ) + ); + final List specs = new ArrayList<>(intervalToSegments.size()); + for (Entry>> entry : intervalToSegments.entrySet()) { + final Interval interval = entry.getKey(); + final List> segmentsToCompact = entry.getValue(); final DataSchema dataSchema = createDataSchema( segmentProvider.dataSource, - holder.getInterval(), - Collections.singletonList(holder), + interval, + segmentsToCompact, dimensionsSpec, - toolbox.getIndexIO(), - jsonMapper, - segmentFileMap + jsonMapper ); specs.add( new IndexIngestionSpec( dataSchema, - createIoConfig(toolbox, dataSchema, holder.getInterval()), - tuningConfig + createIoConfig(toolbox, dataSchema, interval), + compactionTuningConfig ) ); } @@ -319,18 +363,16 @@ public class CompactionTask extends AbstractTask final DataSchema dataSchema = createDataSchema( segmentProvider.dataSource, segmentProvider.interval, - timelineSegments, + queryableIndexAndSegments, dimensionsSpec, - toolbox.getIndexIO(), - jsonMapper, - segmentFileMap + jsonMapper ); return Collections.singletonList( new IndexIngestionSpec( dataSchema, createIoConfig(toolbox, dataSchema, segmentProvider.interval), - tuningConfig + compactionTuningConfig ) ); } @@ -368,21 +410,11 @@ public class CompactionTask extends AbstractTask private static DataSchema createDataSchema( String dataSource, Interval totalInterval, - List> timelineObjectHolder, + List> queryableIndexAndSegments, DimensionsSpec dimensionsSpec, - IndexIO indexIO, - ObjectMapper jsonMapper, - Map segmentFileMap + ObjectMapper jsonMapper ) - throws IOException { - // find metadata for interval - final List> queryableIndexAndSegments = loadSegments( - timelineObjectHolder, - segmentFileMap, - indexIO - ); - // find merged aggregators for (Pair pair : queryableIndexAndSegments) { final QueryableIndex index = pair.lhs; @@ -621,4 +653,105 @@ public class CompactionTask extends AbstractTask return usedSegments; } } + + @VisibleForTesting + static class PartitionConfigurationManager + { + @Nullable + private final Long targetCompactionSizeBytes; + @Nullable + private final IndexTuningConfig tuningConfig; + + PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, @Nullable IndexTuningConfig tuningConfig) + { + this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig); + this.tuningConfig = tuningConfig; + } + + @Nullable + IndexTuningConfig computeTuningConfig(List> queryableIndexAndSegments) + { + if (!hasPartitionConfig(tuningConfig)) { + final long nonNullTargetCompactionSizeBytes = Preconditions.checkNotNull( + targetCompactionSizeBytes, + "targetCompactionSizeBytes" + ); + // Find IndexTuningConfig.targetPartitionSize which is the number of rows per segment. + // Assume that the segment size is proportional to the number of rows. We can improve this later. + final long totalNumRows = queryableIndexAndSegments + .stream() + .mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.lhs.getNumRows()) + .sum(); + final long totalSizeBytes = queryableIndexAndSegments + .stream() + .mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.rhs.getSize()) + .sum(); + + if (totalSizeBytes == 0L) { + throw new ISE("Total input segment size is 0 byte"); + } + + final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes; + final int targetPartitionSize = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes)); + Preconditions.checkState(targetPartitionSize > 0, "Negative targetPartitionSize[%s]", targetPartitionSize); + + log.info( + "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]", + targetPartitionSize, + avgRowsPerByte, + nonNullTargetCompactionSizeBytes + ); + return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig) + .withTargetPartitionSize(targetPartitionSize); + } else { + return tuningConfig; + } + } + + /** + * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that + * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#targetPartitionSize}, + * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together. + * {@link #hasPartitionConfig} checks one of those configs is set. + * + * This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig + * returns true. If targetCompactionSizeBytes is not set, this returns null or + * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of + * hasPartitionConfig. + */ + @Nullable + private static Long getValidTargetCompactionSizeBytes( + @Nullable Long targetCompactionSizeBytes, + @Nullable IndexTuningConfig tuningConfig + ) + { + if (targetCompactionSizeBytes != null) { + Preconditions.checkArgument( + !hasPartitionConfig(tuningConfig), + "targetCompactionSizeBytes[%s] cannot be used with targetPartitionSize[%s], maxTotalRows[%s]," + + " or numShards[%s] of tuningConfig", + targetCompactionSizeBytes, + tuningConfig == null ? null : tuningConfig.getTargetPartitionSize(), + tuningConfig == null ? null : tuningConfig.getMaxTotalRows(), + tuningConfig == null ? null : tuningConfig.getNumShards() + ); + return targetCompactionSizeBytes; + } else { + return hasPartitionConfig(tuningConfig) + ? null + : DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES; + } + } + + private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig) + { + if (tuningConfig != null) { + return tuningConfig.getTargetPartitionSize() != null + || tuningConfig.getMaxTotalRows() != null + || tuningConfig.getNumShards() != null; + } else { + return false; + } + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5d6bef1b859..f00cd0abc84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -427,7 +427,12 @@ public class IndexTask extends AbstractTask implements ChatHandler FileUtils.forceMkdir(firehoseTempDir); ingestionState = IngestionState.DETERMINE_PARTITIONS; - final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir); + + // Initialize maxRowsPerSegment and maxTotalRows lazily + final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig; + @Nullable final Integer targetPartitionSize = getValidTargetPartitionSize(tuningConfig); + @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig); + final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, targetPartitionSize); final DataSchema dataSchema; final Map versions; if (determineIntervals) { @@ -457,7 +462,16 @@ public class IndexTask extends AbstractTask implements ChatHandler } ingestionState = IngestionState.BUILD_SEGMENTS; - return generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir); + return generateAndPublishSegments( + toolbox, + dataSchema, + shardSpecs, + versions, + firehoseFactory, + firehoseTempDir, + targetPartitionSize, + maxTotalRows + ); } catch (Exception e) { log.error(e, "Encountered exception in %s.", ingestionState); @@ -583,7 +597,8 @@ public class IndexTask extends AbstractTask implements ChatHandler private ShardSpecs determineShardSpecs( final TaskToolbox toolbox, final FirehoseFactory firehoseFactory, - final File firehoseTempDir + final File firehoseTempDir, + @Nullable final Integer targetPartitionSize ) throws IOException { final ObjectMapper jsonMapper = toolbox.getObjectMapper(); @@ -618,7 +633,8 @@ public class IndexTask extends AbstractTask implements ChatHandler granularitySpec, tuningConfig, determineIntervals, - determineNumPartitions + determineNumPartitions, + targetPartitionSize ); } } @@ -666,7 +682,8 @@ public class IndexTask extends AbstractTask implements ChatHandler GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, boolean determineIntervals, - boolean determineNumPartitions + boolean determineNumPartitions, + @Nullable Integer targetPartitionSize ) throws IOException { log.info("Determining intervals and shardSpecs"); @@ -690,8 +707,10 @@ public class IndexTask extends AbstractTask implements ChatHandler final int numShards; if (determineNumPartitions) { - final long numRows = collector.estimateCardinalityRound(); - numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize()); + final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound(); + numShards = (int) Math.ceil( + (double) numRows / Preconditions.checkNotNull(targetPartitionSize, "targetPartitionSize") + ); log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); } else { numShards = defaultNumShards; @@ -856,7 +875,9 @@ public class IndexTask extends AbstractTask implements ChatHandler final ShardSpecs shardSpecs, final Map versions, final FirehoseFactory firehoseFactory, - final File firehoseTempDir + final File firehoseTempDir, + @Nullable final Integer targetPartitionSize, + @Nullable final Long maxTotalRows ) throws IOException, InterruptedException { final GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); @@ -1004,8 +1025,8 @@ public class IndexTask extends AbstractTask implements ChatHandler if (addResult.isOk()) { // incremental segment publishment is allowed only when rollup don't have to be perfect. if (!isGuaranteedRollup && - (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), tuningConfig) || - exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), tuningConfig))) { + (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) || + exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator()))) { // There can be some segments waiting for being published even though any rows won't be added to them. // If those segments are not published here, the available space in appenderator will be kept to be small // which makes the size of segments smaller. @@ -1069,6 +1090,40 @@ public class IndexTask extends AbstractTask implements ChatHandler } } + /** + * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null. + * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given + * {@link IndexTuningConfig#targetPartitionSize}. + */ + public static Integer getValidTargetPartitionSize(IndexTuningConfig tuningConfig) + { + @Nullable final Integer numShards = tuningConfig.numShards; + @Nullable final Integer targetPartitionSize = tuningConfig.targetPartitionSize; + if (numShards == null || numShards == -1) { + return targetPartitionSize == null || targetPartitionSize.equals(-1) + ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE + : targetPartitionSize; + } else { + return null; + } + } + + /** + * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null. + * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} or the given + * {@link IndexTuningConfig#maxTotalRows}. + */ + public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig) + { + @Nullable final Integer numShards = tuningConfig.numShards; + @Nullable final Long maxTotalRows = tuningConfig.maxTotalRows; + if (numShards == null || numShards == -1) { + return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; + } else { + return null; + } + } + private void handleParseException(ParseException e) { if (e.isFromPartiallyValidRow()) { @@ -1092,17 +1147,20 @@ public class IndexTask extends AbstractTask implements ChatHandler } } - private static boolean exceedMaxRowsInSegment(int numRowsInSegment, IndexTuningConfig indexTuningConfig) + private static boolean exceedMaxRowsInSegment( + @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig + int numRowsInSegment + ) { - // maxRowsInSegment should be null if numShards is set in indexTuningConfig - final Integer maxRowsInSegment = indexTuningConfig.getTargetPartitionSize(); return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; } - private static boolean exceedMaxRowsInAppenderator(long numRowsInAppenderator, IndexTuningConfig indexTuningConfig) + private static boolean exceedMaxRowsInAppenderator( + // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig + @Nullable final Long maxRowsInAppenderator, + long numRowsInAppenderator + ) { - // maxRowsInAppenderator should be null if numShards is set in indexTuningConfig - final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows(); return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; } @@ -1271,7 +1329,9 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; + static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000; + static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; + private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; @@ -1279,12 +1339,13 @@ public class IndexTask extends AbstractTask implements ChatHandler private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUSH_TIMEOUT = 0; - static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; - + @Nullable private final Integer targetPartitionSize; private final int maxRowsInMemory; private final long maxBytesInMemory; + @Nullable private final Long maxTotalRows; + @Nullable private final Integer numShards; private final IndexSpec indexSpec; private final File basePersistDirectory; @@ -1312,6 +1373,11 @@ public class IndexTask extends AbstractTask implements ChatHandler @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + public static IndexTuningConfig createDefault() + { + return new IndexTuningConfig(); + } + @JsonCreator public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @@ -1385,12 +1451,14 @@ public class IndexTask extends AbstractTask implements ChatHandler "targetPartitionSize and numShards cannot both be set" ); - this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize); + this.targetPartitionSize = (targetPartitionSize != null && targetPartitionSize == -1) + ? null + : targetPartitionSize; this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; // initializing this to 0, it will be lazily initialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; - this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows); + this.maxTotalRows = maxTotalRows; this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; @@ -1422,26 +1490,6 @@ public class IndexTask extends AbstractTask implements ChatHandler : logParseExceptions; } - private static Integer initializeTargetPartitionSize(Integer numShards, Integer targetPartitionSize) - { - if (numShards == null || numShards == -1) { - return targetPartitionSize == null || targetPartitionSize.equals(-1) - ? DEFAULT_TARGET_PARTITION_SIZE - : targetPartitionSize; - } else { - return null; - } - } - - private static Long initializeMaxTotalRows(Integer numShards, Long maxTotalRows) - { - if (numShards == null || numShards == -1) { - return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; - } else { - return null; - } - } - public IndexTuningConfig withBasePersistDirectory(File dir) { return new IndexTuningConfig( @@ -1464,6 +1512,33 @@ public class IndexTask extends AbstractTask implements ChatHandler ); } + public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize) + { + return new IndexTuningConfig( + targetPartitionSize, + maxRowsInMemory, + maxBytesInMemory, + maxTotalRows, + numShards, + indexSpec, + maxPendingPersists, + forceExtendableShardSpecs, + forceGuaranteedRollup, + reportParseExceptions, + pushTimeout, + basePersistDirectory, + segmentWriteOutMediumFactory, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + /** + * Return the target number of rows per segment. This returns null if it's not specified in tuningConfig. + * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get the valid value. + */ + @Nullable @JsonProperty public Integer getTargetPartitionSize() { @@ -1484,6 +1559,10 @@ public class IndexTask extends AbstractTask implements ChatHandler return maxBytesInMemory; } + /** + * Return the max number of total rows in appenderator. This returns null if it's not specified in tuningConfig. + * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the valid value. + */ @JsonProperty @Override @Nullable @@ -1633,5 +1712,28 @@ public class IndexTask extends AbstractTask implements ChatHandler maxSavedParseExceptions ); } + + @Override + public String toString() + { + return "IndexTuningConfig{" + + "targetPartitionSize=" + targetPartitionSize + + ", maxRowsInMemory=" + maxRowsInMemory + + ", maxBytesInMemory=" + maxBytesInMemory + + ", maxTotalRows=" + maxTotalRows + + ", numShards=" + numShards + + ", indexSpec=" + indexSpec + + ", basePersistDirectory=" + basePersistDirectory + + ", maxPendingPersists=" + maxPendingPersists + + ", forceExtendableShardSpecs=" + forceExtendableShardSpecs + + ", forceGuaranteedRollup=" + forceGuaranteedRollup + + ", reportParseExceptions=" + reportParseExceptions + + ", pushTimeout=" + pushTimeout + + ", logParseExceptions=" + logParseExceptions + + ", maxParseExceptions=" + maxParseExceptions + + ", maxSavedParseExceptions=" + maxSavedParseExceptions + + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + '}'; + } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index 46cf03e65e8..20871afc6d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -303,7 +303,10 @@ public class ParallelIndexSubTask extends AbstractTask ); } + // Initialize maxRowsPerSegment and maxTotalRows lazily final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); + @Nullable final Integer targetPartitionSize = IndexTask.getValidTargetPartitionSize(tuningConfig); + @Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig); final long pushTimeout = tuningConfig.getPushTimeout(); final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient, ingestionSchema); @@ -348,8 +351,8 @@ public class ParallelIndexSubTask extends AbstractTask final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); if (addResult.isOk()) { - if (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), tuningConfig) || - exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), tuningConfig)) { + if (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) || + exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator())) { // There can be some segments waiting for being published even though any rows won't be added to them. // If those segments are not published here, the available space in appenderator will be kept to be small // which makes the size of segments smaller. @@ -384,22 +387,19 @@ public class ParallelIndexSubTask extends AbstractTask } private static boolean exceedMaxRowsInSegment( - int numRowsInSegment, - ParallelIndexTuningConfig indexTuningConfig + @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig + int numRowsInSegment ) { - // maxRowsInSegment should be null if numShards is set in indexTuningConfig - final Integer maxRowsInSegment = indexTuningConfig.getTargetPartitionSize(); return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; } private static boolean exceedMaxRowsInAppenderator( - long numRowsInAppenderator, - ParallelIndexTuningConfig indexTuningConfig + // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig + @Nullable Long maxRowsInAppenderator, + long numRowsInAppenderator ) { - // maxRowsInAppenderator should be null if numShards is set in indexTuningConfig - final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows(); return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 381b51c549a..5cd5f1fbdbd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -28,8 +28,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.inject.Binder; -import com.google.inject.Module; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -49,6 +47,7 @@ import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; @@ -59,6 +58,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; @@ -71,8 +71,14 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SimpleQueryableIndex; +import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.column.Column; -import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ComplexColumn; +import org.apache.druid.segment.column.DictionaryEncodedColumn; +import org.apache.druid.segment.column.GenericColumn; +import org.apache.druid.segment.column.SpatialIndex; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; @@ -84,6 +90,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.transform.TransformingInputRowParser; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.security.AuthTestUtils; @@ -120,6 +127,8 @@ import java.util.stream.IntStream; @RunWith(Parameterized.class) public class CompactionTaskTest { + private static final long SEGMENT_SIZE_BYTES = 100; + private static final int NUM_ROWS_PER_SEGMENT = 10; private static final String DATA_SOURCE = "dataSource"; private static final String TIMESTAMP_COLUMN = "timestamp"; private static final String MIXED_TYPE_COLUMN = "string_to_double"; @@ -204,7 +213,7 @@ public class CompactionTaskTest new ArrayList<>(AGGREGATORS.keySet()), new NumberedShardSpec(0, 1), 0, - 1 + SEGMENT_SIZE_BYTES ), new File("file_" + i) ); @@ -225,16 +234,11 @@ public class CompactionTaskTest ); GuiceInjectableValues injectableValues = new GuiceInjectableValues( GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); - binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); - binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); - } + ImmutableList.of( + binder -> { + binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); } ) ) @@ -265,7 +269,7 @@ public class CompactionTaskTest private static IndexTuningConfig createTuningConfig() { return new IndexTuningConfig( - 5000000, + null, // null to compute targetPartitionSize automatically 500000, 1000000L, null, @@ -329,6 +333,7 @@ public class CompactionTaskTest null, null, null, + null, createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, @@ -359,6 +364,7 @@ public class CompactionTaskTest SEGMENTS, null, null, + null, createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, @@ -373,19 +379,21 @@ public class CompactionTaskTest Assert.assertEquals(task.getInterval(), fromJson.getInterval()); Assert.assertEquals(task.getSegments(), fromJson.getSegments()); Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec()); + Assert.assertEquals(task.isKeepSegmentGranularity(), fromJson.isKeepSegmentGranularity()); + Assert.assertEquals(task.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(task.getContext(), fromJson.getContext()); } @Test - public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException + public void testCreateIngestionSchema() throws IOException, SegmentLoadingException { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, TUNING_CONFIG), null, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( @@ -393,6 +401,12 @@ public class CompactionTaskTest ); if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); Assert.assertEquals(5, ingestionSpecs.size()); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); } else { @@ -402,14 +416,39 @@ public class CompactionTaskTest } @Test - public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException + public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException { + final IndexTuningConfig tuningConfig = new IndexTuningConfig( + 5, + 500000, + 1000000L, + null, + null, + null, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 5000, + true, + false, + true, + false, + null, + 100L, + null, + null, + null, + null + ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, tuningConfig), null, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( @@ -417,11 +456,142 @@ public class CompactionTaskTest ); if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); } else { Assert.assertEquals(1, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + Collections.singletonList(COMPACTION_INTERVAL), + tuningConfig + ); + } + } + + @Test + public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException + { + final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, + 500000, + 1000000L, + 5L, + null, + null, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 5000, + true, + false, + true, + false, + null, + 100L, + null, + null, + null, + null + ); + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, tuningConfig), + null, + keepSegmentGranularity, + objectMapper + ); + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); + + if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + Collections.singletonList(COMPACTION_INTERVAL), + tuningConfig + ); + } + } + + @Test + public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException + { + final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, + 500000, + 1000000L, + null, + null, + 3, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 5000, + true, + false, + true, + false, + null, + 100L, + null, + null, + null, + null + ); + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, tuningConfig), + null, + keepSegmentGranularity, + objectMapper + ); + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); + + if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + Collections.singletonList(COMPACTION_INTERVAL), + tuningConfig + ); } } @@ -458,13 +628,19 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, TUNING_CONFIG), customSpec, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); Assert.assertEquals(5, ingestionSpecs.size()); final List dimensionsSpecs = new ArrayList<>(5); IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec)); @@ -489,9 +665,9 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(SEGMENTS), + new PartitionConfigurationManager(null, TUNING_CONFIG), null, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( @@ -499,6 +675,12 @@ public class CompactionTaskTest ); if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); Assert.assertEquals(5, ingestionSpecs.size()); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); } else { @@ -518,9 +700,9 @@ public class CompactionTaskTest CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(segments), + new PartitionConfigurationManager(null, TUNING_CONFIG), null, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); } @@ -537,9 +719,9 @@ public class CompactionTaskTest CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(segments), + new PartitionConfigurationManager(null, TUNING_CONFIG), null, keepSegmentGranularity, - TUNING_CONFIG, objectMapper ); } @@ -560,6 +742,7 @@ public class CompactionTaskTest null, null, null, + null, objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), @@ -567,6 +750,46 @@ public class CompactionTaskTest ); } + @Test + public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException + { + final IndexTuningConfig tuningConfig = new IndexTuningConfig( + 5, + 500000, + 1000000L, + null, + null, + null, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 5000, + true, + false, + true, + false, + null, + 100L, + null, + null, + null, + null + ); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be used with"); + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(5L, tuningConfig), + null, + keepSegmentGranularity, + objectMapper + ); + } + private static List getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity) { if (keepSegmentGranularity) { @@ -617,6 +840,45 @@ public class CompactionTaskTest List expectedDimensionsSpecs, List expectedSegmentIntervals ) + { + assertIngestionSchema( + ingestionSchemas, + expectedDimensionsSpecs, + expectedSegmentIntervals, + new IndexTuningConfig( + 41943040, // automatically computed targetPartitionSize + 500000, + 1000000L, + null, + null, + null, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 5000, + true, + false, + true, + false, + null, + 100L, + null, + null, + null, + null + ) + ); + } + + private static void assertIngestionSchema( + List ingestionSchemas, + List expectedDimensionsSpecs, + List expectedSegmentIntervals, + IndexTuningConfig expectedTuningConfig + ) { Preconditions.checkArgument( ingestionSchemas.size() == expectedDimensionsSpecs.size(), @@ -677,7 +939,7 @@ public class CompactionTaskTest ); // assert tuningConfig - Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig()); + Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig()); } } @@ -841,17 +1103,74 @@ public class CompactionTaskTest private static Column createColumn(DimensionSchema dimensionSchema) { - return new ColumnBuilder() - .setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType())) - .setDictionaryEncodedColumn(() -> null) - .setBitmapIndex(() -> null) - .build(); + return new TestColumn(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType())); } private static Column createColumn(AggregatorFactory aggregatorFactory) { - return new ColumnBuilder() - .setType(ValueType.fromString(aggregatorFactory.getTypeName())) - .build(); + return new TestColumn(ValueType.fromString(aggregatorFactory.getTypeName())); + } + + private static class TestColumn implements Column + { + private final ColumnCapabilities columnCapabilities; + + TestColumn(ValueType type) + { + columnCapabilities = new ColumnCapabilitiesImpl() + .setType(type) + .setDictionaryEncoded(type == ValueType.STRING) // set a fake value to make string columns + .setHasBitmapIndexes(type == ValueType.STRING) + .setHasSpatialIndexes(false) + .setHasMultipleValues(false); + } + + @Override + public ColumnCapabilities getCapabilities() + { + return columnCapabilities; + } + + @Override + public int getLength() + { + return NUM_ROWS_PER_SEGMENT; + } + + @Override + public DictionaryEncodedColumn getDictionaryEncoding() + { + return null; + } + + @Override + public GenericColumn getGenericColumn() + { + return null; + } + + @Override + public ComplexColumn getComplexColumn() + { + return null; + } + + @Override + public BitmapIndex getBitmapIndex() + { + return null; + } + + @Override + public SpatialIndex getSpatialIndex() + { + return null; + } + + @Override + public SettableColumnValueSelector makeSettableColumnValueSelector() + { + return null; + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index a2c742e84f1..31e25168174 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -106,14 +106,14 @@ public class TaskSerdeTest IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs()); - Assert.assertEquals(false, tuningConfig.isReportParseExceptions()); + Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs()); + Assert.assertFalse(tuningConfig.isReportParseExceptions()); Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec()); Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod()); Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory()); - Assert.assertEquals(null, tuningConfig.getNumShards()); - Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getNumShards()); + Assert.assertNull(tuningConfig.getTargetPartitionSize()); } @Test @@ -125,14 +125,14 @@ public class TaskSerdeTest ); Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); - Assert.assertEquals(null, tuningConfig.getNumShards()); + Assert.assertNull(tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( "{\"type\":\"index\", \"numShards\":10}", IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getTargetPartitionSize()); Assert.assertEquals(10, (int) tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( @@ -140,7 +140,7 @@ public class TaskSerdeTest IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getTargetPartitionSize()); Assert.assertEquals(10, (int) tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( @@ -148,7 +148,7 @@ public class TaskSerdeTest IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(null, tuningConfig.getNumShards()); + Assert.assertNull(tuningConfig.getNumShards()); Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); tuningConfig = jsonMapper.readValue( @@ -156,11 +156,8 @@ public class TaskSerdeTest IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(null, tuningConfig.getNumShards()); - Assert.assertEquals( - IndexTask.IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE, - (int) tuningConfig.getTargetPartitionSize() - ); + Assert.assertNull(tuningConfig.getNumShards()); + Assert.assertNull(tuningConfig.getTargetPartitionSize()); } @Test diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 1ddcc9e6e73..6bc13365e6b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -31,9 +31,10 @@ import java.util.Objects; public class DataSourceCompactionConfig { + public static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 1024; // 400MB + // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25; - private static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 1024; // 400MB private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150; private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");