Add support targetCompactionSizeBytes for compactionTask (#6203)

* Add support targetCompactionSizeBytes for compactionTask

* fix test

* fix a bug in keepSegmentGranularity

* fix wrong noinspection comment

* address comments
This commit is contained in:
Jihoon Son 2018-09-28 11:16:35 -07:00 committed by GitHub
parent 6f44e568db
commit 122caec7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 694 additions and 135 deletions

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.Spliterator;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
@ -130,6 +131,12 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
return holderSet.iterator(); return holderSet.iterator();
} }
@Override
public Spliterator<PartitionChunk<T>> spliterator()
{
return holderSet.spliterator();
}
public Iterable<T> payloads() public Iterable<T> payloads()
{ {
return Iterables.transform(this, PartitionChunk::getObject); return Iterables.transform(this, PartitionChunk::getObject);

View File

@ -68,6 +68,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.TimelineObjectHolder;
@ -86,6 +87,7 @@ import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -101,13 +103,15 @@ public class CompactionTask extends AbstractTask
private final List<DataSegment> segments; private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec; private final DimensionsSpec dimensionsSpec;
private final boolean keepSegmentGranularity; private final boolean keepSegmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
@Nullable
private final IndexTuningConfig tuningConfig; private final IndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@JsonIgnore @JsonIgnore
private final SegmentProvider segmentProvider; private final SegmentProvider segmentProvider;
@JsonIgnore @JsonIgnore
private List<IndexTask> indexTaskSpecs; private final PartitionConfigurationManager partitionConfigurationManager;
@JsonIgnore @JsonIgnore
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
@ -118,6 +122,9 @@ public class CompactionTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final RowIngestionMetersFactory rowIngestionMetersFactory; private final RowIngestionMetersFactory rowIngestionMetersFactory;
@JsonIgnore
private List<IndexTask> indexTaskSpecs;
@JsonCreator @JsonCreator
public CompactionTask( public CompactionTask(
@JsonProperty("id") final String id, @JsonProperty("id") final String id,
@ -127,6 +134,7 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("segments") final List<DataSegment> segments, @Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec, @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
@Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity, @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity,
@Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context, @Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper, @JacksonInject ObjectMapper jsonMapper,
@ -149,9 +157,11 @@ public class CompactionTask extends AbstractTask
this.keepSegmentGranularity = keepSegmentGranularity == null this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY ? DEFAULT_KEEP_SEGMENT_GRANULARITY
: keepSegmentGranularity; : keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig; this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider; this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory; this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@ -181,6 +191,14 @@ public class CompactionTask extends AbstractTask
return keepSegmentGranularity; return keepSegmentGranularity;
} }
@Nullable
@JsonProperty
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@Nullable
@JsonProperty @JsonProperty
public IndexTuningConfig getTuningConfig() public IndexTuningConfig getTuningConfig()
{ {
@ -220,9 +238,9 @@ public class CompactionTask extends AbstractTask
indexTaskSpecs = createIngestionSchema( indexTaskSpecs = createIngestionSchema(
toolbox, toolbox,
segmentProvider, segmentProvider,
partitionConfigurationManager,
dimensionsSpec, dimensionsSpec,
keepSegmentGranularity, keepSegmentGranularity,
tuningConfig,
jsonMapper jsonMapper
).stream() ).stream()
.map(spec -> new IndexTask( .map(spec -> new IndexTask(
@ -271,12 +289,12 @@ public class CompactionTask extends AbstractTask
*/ */
@VisibleForTesting @VisibleForTesting
static List<IndexIngestionSpec> createIngestionSchema( static List<IndexIngestionSpec> createIngestionSchema(
TaskToolbox toolbox, final TaskToolbox toolbox,
SegmentProvider segmentProvider, final SegmentProvider segmentProvider,
DimensionsSpec dimensionsSpec, final PartitionConfigurationManager partitionConfigurationManager,
boolean keepSegmentGranularity, final DimensionsSpec dimensionsSpec,
IndexTuningConfig tuningConfig, final boolean keepSegmentGranularity,
ObjectMapper jsonMapper final ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException ) throws IOException, SegmentLoadingException
{ {
Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments( Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
@ -290,26 +308,52 @@ public class CompactionTask extends AbstractTask
return Collections.emptyList(); return Collections.emptyList();
} }
// find metadata for interval
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
);
final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(
queryableIndexAndSegments
);
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
// if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index // If keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index
// task per segment interval. // task per segment interval.
final List<IndexIngestionSpec> specs = new ArrayList<>(timelineSegments.size());
for (TimelineObjectHolder<String, DataSegment> holder : timelineSegments) { //noinspection unchecked,ConstantConditions
final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = queryableIndexAndSegments
.stream()
.collect(
Collectors.toMap(
// rhs can't be null here so we skip null checking and supress the warning with the above comment
p -> p.rhs.getInterval(),
Lists::newArrayList,
(l1, l2) -> {
l1.addAll(l2);
return l1;
}
)
);
final List<IndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.getValue();
final DataSchema dataSchema = createDataSchema( final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource, segmentProvider.dataSource,
holder.getInterval(), interval,
Collections.singletonList(holder), segmentsToCompact,
dimensionsSpec, dimensionsSpec,
toolbox.getIndexIO(), jsonMapper
jsonMapper,
segmentFileMap
); );
specs.add( specs.add(
new IndexIngestionSpec( new IndexIngestionSpec(
dataSchema, dataSchema,
createIoConfig(toolbox, dataSchema, holder.getInterval()), createIoConfig(toolbox, dataSchema, interval),
tuningConfig compactionTuningConfig
) )
); );
} }
@ -319,18 +363,16 @@ public class CompactionTask extends AbstractTask
final DataSchema dataSchema = createDataSchema( final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource, segmentProvider.dataSource,
segmentProvider.interval, segmentProvider.interval,
timelineSegments, queryableIndexAndSegments,
dimensionsSpec, dimensionsSpec,
toolbox.getIndexIO(), jsonMapper
jsonMapper,
segmentFileMap
); );
return Collections.singletonList( return Collections.singletonList(
new IndexIngestionSpec( new IndexIngestionSpec(
dataSchema, dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval), createIoConfig(toolbox, dataSchema, segmentProvider.interval),
tuningConfig compactionTuningConfig
) )
); );
} }
@ -368,21 +410,11 @@ public class CompactionTask extends AbstractTask
private static DataSchema createDataSchema( private static DataSchema createDataSchema(
String dataSource, String dataSource,
Interval totalInterval, Interval totalInterval,
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder, List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
DimensionsSpec dimensionsSpec, DimensionsSpec dimensionsSpec,
IndexIO indexIO, ObjectMapper jsonMapper
ObjectMapper jsonMapper,
Map<DataSegment, File> segmentFileMap
) )
throws IOException
{ {
// find metadata for interval
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineObjectHolder,
segmentFileMap,
indexIO
);
// find merged aggregators // find merged aggregators
for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) { for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs; final QueryableIndex index = pair.lhs;
@ -621,4 +653,105 @@ public class CompactionTask extends AbstractTask
return usedSegments; return usedSegments;
} }
} }
@VisibleForTesting
static class PartitionConfigurationManager
{
@Nullable
private final Long targetCompactionSizeBytes;
@Nullable
private final IndexTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, @Nullable IndexTuningConfig tuningConfig)
{
this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig);
this.tuningConfig = tuningConfig;
}
@Nullable
IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments)
{
if (!hasPartitionConfig(tuningConfig)) {
final long nonNullTargetCompactionSizeBytes = Preconditions.checkNotNull(
targetCompactionSizeBytes,
"targetCompactionSizeBytes"
);
// Find IndexTuningConfig.targetPartitionSize which is the number of rows per segment.
// Assume that the segment size is proportional to the number of rows. We can improve this later.
final long totalNumRows = queryableIndexAndSegments
.stream()
.mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.lhs.getNumRows())
.sum();
final long totalSizeBytes = queryableIndexAndSegments
.stream()
.mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.rhs.getSize())
.sum();
if (totalSizeBytes == 0L) {
throw new ISE("Total input segment size is 0 byte");
}
final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
final int targetPartitionSize = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
Preconditions.checkState(targetPartitionSize > 0, "Negative targetPartitionSize[%s]", targetPartitionSize);
log.info(
"Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]",
targetPartitionSize,
avgRowsPerByte,
nonNullTargetCompactionSizeBytes
);
return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig)
.withTargetPartitionSize(targetPartitionSize);
} else {
return tuningConfig;
}
}
/**
* Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that
* targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#targetPartitionSize},
* {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
* {@link #hasPartitionConfig} checks one of those configs is set.
*
* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
* returns true. If targetCompactionSizeBytes is not set, this returns null or
* {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of
* hasPartitionConfig.
*/
@Nullable
private static Long getValidTargetCompactionSizeBytes(
@Nullable Long targetCompactionSizeBytes,
@Nullable IndexTuningConfig tuningConfig
)
{
if (targetCompactionSizeBytes != null) {
Preconditions.checkArgument(
!hasPartitionConfig(tuningConfig),
"targetCompactionSizeBytes[%s] cannot be used with targetPartitionSize[%s], maxTotalRows[%s],"
+ " or numShards[%s] of tuningConfig",
targetCompactionSizeBytes,
tuningConfig == null ? null : tuningConfig.getTargetPartitionSize(),
tuningConfig == null ? null : tuningConfig.getMaxTotalRows(),
tuningConfig == null ? null : tuningConfig.getNumShards()
);
return targetCompactionSizeBytes;
} else {
return hasPartitionConfig(tuningConfig)
? null
: DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
}
}
private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig)
{
if (tuningConfig != null) {
return tuningConfig.getTargetPartitionSize() != null
|| tuningConfig.getMaxTotalRows() != null
|| tuningConfig.getNumShards() != null;
} else {
return false;
}
}
}
} }

View File

@ -427,7 +427,12 @@ public class IndexTask extends AbstractTask implements ChatHandler
FileUtils.forceMkdir(firehoseTempDir); FileUtils.forceMkdir(firehoseTempDir);
ingestionState = IngestionState.DETERMINE_PARTITIONS; ingestionState = IngestionState.DETERMINE_PARTITIONS;
final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir);
// Initialize maxRowsPerSegment and maxTotalRows lazily
final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
@Nullable final Integer targetPartitionSize = getValidTargetPartitionSize(tuningConfig);
@Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, targetPartitionSize);
final DataSchema dataSchema; final DataSchema dataSchema;
final Map<Interval, String> versions; final Map<Interval, String> versions;
if (determineIntervals) { if (determineIntervals) {
@ -457,7 +462,16 @@ public class IndexTask extends AbstractTask implements ChatHandler
} }
ingestionState = IngestionState.BUILD_SEGMENTS; ingestionState = IngestionState.BUILD_SEGMENTS;
return generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir); return generateAndPublishSegments(
toolbox,
dataSchema,
shardSpecs,
versions,
firehoseFactory,
firehoseTempDir,
targetPartitionSize,
maxTotalRows
);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState); log.error(e, "Encountered exception in %s.", ingestionState);
@ -583,7 +597,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
private ShardSpecs determineShardSpecs( private ShardSpecs determineShardSpecs(
final TaskToolbox toolbox, final TaskToolbox toolbox,
final FirehoseFactory firehoseFactory, final FirehoseFactory firehoseFactory,
final File firehoseTempDir final File firehoseTempDir,
@Nullable final Integer targetPartitionSize
) throws IOException ) throws IOException
{ {
final ObjectMapper jsonMapper = toolbox.getObjectMapper(); final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@ -618,7 +633,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
granularitySpec, granularitySpec,
tuningConfig, tuningConfig,
determineIntervals, determineIntervals,
determineNumPartitions determineNumPartitions,
targetPartitionSize
); );
} }
} }
@ -666,7 +682,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
GranularitySpec granularitySpec, GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig, IndexTuningConfig tuningConfig,
boolean determineIntervals, boolean determineIntervals,
boolean determineNumPartitions boolean determineNumPartitions,
@Nullable Integer targetPartitionSize
) throws IOException ) throws IOException
{ {
log.info("Determining intervals and shardSpecs"); log.info("Determining intervals and shardSpecs");
@ -690,8 +707,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
final int numShards; final int numShards;
if (determineNumPartitions) { if (determineNumPartitions) {
final long numRows = collector.estimateCardinalityRound(); final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound();
numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize()); numShards = (int) Math.ceil(
(double) numRows / Preconditions.checkNotNull(targetPartitionSize, "targetPartitionSize")
);
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
} else { } else {
numShards = defaultNumShards; numShards = defaultNumShards;
@ -856,7 +875,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
final ShardSpecs shardSpecs, final ShardSpecs shardSpecs,
final Map<Interval, String> versions, final Map<Interval, String> versions,
final FirehoseFactory firehoseFactory, final FirehoseFactory firehoseFactory,
final File firehoseTempDir final File firehoseTempDir,
@Nullable final Integer targetPartitionSize,
@Nullable final Long maxTotalRows
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
@ -1004,8 +1025,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
if (addResult.isOk()) { if (addResult.isOk()) {
// incremental segment publishment is allowed only when rollup don't have to be perfect. // incremental segment publishment is allowed only when rollup don't have to be perfect.
if (!isGuaranteedRollup && if (!isGuaranteedRollup &&
(exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), tuningConfig) || (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) ||
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), tuningConfig))) { exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator()))) {
// There can be some segments waiting for being published even though any rows won't be added to them. // There can be some segments waiting for being published even though any rows won't be added to them.
// If those segments are not published here, the available space in appenderator will be kept to be small // If those segments are not published here, the available space in appenderator will be kept to be small
// which makes the size of segments smaller. // which makes the size of segments smaller.
@ -1069,6 +1090,40 @@ public class IndexTask extends AbstractTask implements ChatHandler
} }
} }
/**
* Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null.
* Otherwise, this returns {@link IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given
* {@link IndexTuningConfig#targetPartitionSize}.
*/
public static Integer getValidTargetPartitionSize(IndexTuningConfig tuningConfig)
{
@Nullable final Integer numShards = tuningConfig.numShards;
@Nullable final Integer targetPartitionSize = tuningConfig.targetPartitionSize;
if (numShards == null || numShards == -1) {
return targetPartitionSize == null || targetPartitionSize.equals(-1)
? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE
: targetPartitionSize;
} else {
return null;
}
}
/**
* Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null.
* Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} or the given
* {@link IndexTuningConfig#maxTotalRows}.
*/
public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig)
{
@Nullable final Integer numShards = tuningConfig.numShards;
@Nullable final Long maxTotalRows = tuningConfig.maxTotalRows;
if (numShards == null || numShards == -1) {
return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
} else {
return null;
}
}
private void handleParseException(ParseException e) private void handleParseException(ParseException e)
{ {
if (e.isFromPartiallyValidRow()) { if (e.isFromPartiallyValidRow()) {
@ -1092,17 +1147,20 @@ public class IndexTask extends AbstractTask implements ChatHandler
} }
} }
private static boolean exceedMaxRowsInSegment(int numRowsInSegment, IndexTuningConfig indexTuningConfig) private static boolean exceedMaxRowsInSegment(
@Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig
int numRowsInSegment
)
{ {
// maxRowsInSegment should be null if numShards is set in indexTuningConfig
final Integer maxRowsInSegment = indexTuningConfig.getTargetPartitionSize();
return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
} }
private static boolean exceedMaxRowsInAppenderator(long numRowsInAppenderator, IndexTuningConfig indexTuningConfig) private static boolean exceedMaxRowsInAppenderator(
// maxRowsInAppenderator can be null if numShards is set in indexTuningConfig
@Nullable final Long maxRowsInAppenderator,
long numRowsInAppenderator
)
{ {
// maxRowsInAppenderator should be null if numShards is set in indexTuningConfig
final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator;
} }
@ -1271,7 +1329,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JsonTypeName("index") @JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
{ {
private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000;
static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
@ -1279,12 +1339,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUSH_TIMEOUT = 0; private static final long DEFAULT_PUSH_TIMEOUT = 0;
static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; @Nullable
private final Integer targetPartitionSize; private final Integer targetPartitionSize;
private final int maxRowsInMemory; private final int maxRowsInMemory;
private final long maxBytesInMemory; private final long maxBytesInMemory;
@Nullable
private final Long maxTotalRows; private final Long maxTotalRows;
@Nullable
private final Integer numShards; private final Integer numShards;
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
private final File basePersistDirectory; private final File basePersistDirectory;
@ -1312,6 +1373,11 @@ public class IndexTask extends AbstractTask implements ChatHandler
@Nullable @Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
public static IndexTuningConfig createDefault()
{
return new IndexTuningConfig();
}
@JsonCreator @JsonCreator
public IndexTuningConfig( public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
@ -1385,12 +1451,14 @@ public class IndexTask extends AbstractTask implements ChatHandler
"targetPartitionSize and numShards cannot both be set" "targetPartitionSize and numShards cannot both be set"
); );
this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize); this.targetPartitionSize = (targetPartitionSize != null && targetPartitionSize == -1)
? null
: targetPartitionSize;
this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value // initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows); this.maxTotalRows = maxTotalRows;
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
@ -1422,26 +1490,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
: logParseExceptions; : logParseExceptions;
} }
private static Integer initializeTargetPartitionSize(Integer numShards, Integer targetPartitionSize)
{
if (numShards == null || numShards == -1) {
return targetPartitionSize == null || targetPartitionSize.equals(-1)
? DEFAULT_TARGET_PARTITION_SIZE
: targetPartitionSize;
} else {
return null;
}
}
private static Long initializeMaxTotalRows(Integer numShards, Long maxTotalRows)
{
if (numShards == null || numShards == -1) {
return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
} else {
return null;
}
}
public IndexTuningConfig withBasePersistDirectory(File dir) public IndexTuningConfig withBasePersistDirectory(File dir)
{ {
return new IndexTuningConfig( return new IndexTuningConfig(
@ -1464,6 +1512,33 @@ public class IndexTask extends AbstractTask implements ChatHandler
); );
} }
public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize)
{
return new IndexTuningConfig(
targetPartitionSize,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
numShards,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
forceGuaranteedRollup,
reportParseExceptions,
pushTimeout,
basePersistDirectory,
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
}
/**
* Return the target number of rows per segment. This returns null if it's not specified in tuningConfig.
* Please use {@link IndexTask#getValidTargetPartitionSize} instead to get the valid value.
*/
@Nullable
@JsonProperty @JsonProperty
public Integer getTargetPartitionSize() public Integer getTargetPartitionSize()
{ {
@ -1484,6 +1559,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
return maxBytesInMemory; return maxBytesInMemory;
} }
/**
* Return the max number of total rows in appenderator. This returns null if it's not specified in tuningConfig.
* Please use {@link IndexTask#getValidMaxTotalRows} instead to get the valid value.
*/
@JsonProperty @JsonProperty
@Override @Override
@Nullable @Nullable
@ -1633,5 +1712,28 @@ public class IndexTask extends AbstractTask implements ChatHandler
maxSavedParseExceptions maxSavedParseExceptions
); );
} }
@Override
public String toString()
{
return "IndexTuningConfig{" +
"targetPartitionSize=" + targetPartitionSize +
", maxRowsInMemory=" + maxRowsInMemory +
", maxBytesInMemory=" + maxBytesInMemory +
", maxTotalRows=" + maxTotalRows +
", numShards=" + numShards +
", indexSpec=" + indexSpec +
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", forceExtendableShardSpecs=" + forceExtendableShardSpecs +
", forceGuaranteedRollup=" + forceGuaranteedRollup +
", reportParseExceptions=" + reportParseExceptions +
", pushTimeout=" + pushTimeout +
", logParseExceptions=" + logParseExceptions +
", maxParseExceptions=" + maxParseExceptions +
", maxSavedParseExceptions=" + maxSavedParseExceptions +
", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
'}';
}
} }
} }

View File

@ -303,7 +303,10 @@ public class ParallelIndexSubTask extends AbstractTask
); );
} }
// Initialize maxRowsPerSegment and maxTotalRows lazily
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
@Nullable final Integer targetPartitionSize = IndexTask.getValidTargetPartitionSize(tuningConfig);
@Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig);
final long pushTimeout = tuningConfig.getPushTimeout(); final long pushTimeout = tuningConfig.getPushTimeout();
final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient, ingestionSchema); final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient, ingestionSchema);
@ -348,8 +351,8 @@ public class ParallelIndexSubTask extends AbstractTask
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
if (addResult.isOk()) { if (addResult.isOk()) {
if (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), tuningConfig) || if (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) ||
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), tuningConfig)) { exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator())) {
// There can be some segments waiting for being published even though any rows won't be added to them. // There can be some segments waiting for being published even though any rows won't be added to them.
// If those segments are not published here, the available space in appenderator will be kept to be small // If those segments are not published here, the available space in appenderator will be kept to be small
// which makes the size of segments smaller. // which makes the size of segments smaller.
@ -384,22 +387,19 @@ public class ParallelIndexSubTask extends AbstractTask
} }
private static boolean exceedMaxRowsInSegment( private static boolean exceedMaxRowsInSegment(
int numRowsInSegment, @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig
ParallelIndexTuningConfig indexTuningConfig int numRowsInSegment
) )
{ {
// maxRowsInSegment should be null if numShards is set in indexTuningConfig
final Integer maxRowsInSegment = indexTuningConfig.getTargetPartitionSize();
return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
} }
private static boolean exceedMaxRowsInAppenderator( private static boolean exceedMaxRowsInAppenderator(
long numRowsInAppenderator, // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig
ParallelIndexTuningConfig indexTuningConfig @Nullable Long maxRowsInAppenderator,
long numRowsInAppenderator
) )
{ {
// maxRowsInAppenderator should be null if numShards is set in indexTuningConfig
final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator;
} }

View File

@ -28,8 +28,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
@ -49,6 +47,7 @@ import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
@ -59,6 +58,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@ -71,8 +71,14 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata; import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SimpleQueryableIndex; import org.apache.druid.segment.SimpleQueryableIndex;
import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.column.Column; import org.apache.druid.segment.column.Column;
import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.GenericColumn;
import org.apache.druid.segment.column.SpatialIndex;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.CompressionStrategy;
@ -84,6 +90,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.transform.TransformingInputRowParser; import org.apache.druid.segment.transform.TransformingInputRowParser;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthTestUtils;
@ -120,6 +127,8 @@ import java.util.stream.IntStream;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class CompactionTaskTest public class CompactionTaskTest
{ {
private static final long SEGMENT_SIZE_BYTES = 100;
private static final int NUM_ROWS_PER_SEGMENT = 10;
private static final String DATA_SOURCE = "dataSource"; private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp"; private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double"; private static final String MIXED_TYPE_COLUMN = "string_to_double";
@ -204,7 +213,7 @@ public class CompactionTaskTest
new ArrayList<>(AGGREGATORS.keySet()), new ArrayList<>(AGGREGATORS.keySet()),
new NumberedShardSpec(0, 1), new NumberedShardSpec(0, 1),
0, 0,
1 SEGMENT_SIZE_BYTES
), ),
new File("file_" + i) new File("file_" + i)
); );
@ -225,17 +234,12 @@ public class CompactionTaskTest
); );
GuiceInjectableValues injectableValues = new GuiceInjectableValues( GuiceInjectableValues injectableValues = new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules( GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of( ImmutableList.of(
new Module() binder -> {
{
@Override
public void configure(Binder binder)
{
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
} }
}
) )
) )
); );
@ -265,7 +269,7 @@ public class CompactionTaskTest
private static IndexTuningConfig createTuningConfig() private static IndexTuningConfig createTuningConfig()
{ {
return new IndexTuningConfig( return new IndexTuningConfig(
5000000, null, // null to compute targetPartitionSize automatically
500000, 500000,
1000000L, 1000000L,
null, null,
@ -329,6 +333,7 @@ public class CompactionTaskTest
null, null,
null, null,
null, null,
null,
createTuningConfig(), createTuningConfig(),
ImmutableMap.of("testKey", "testContext"), ImmutableMap.of("testKey", "testContext"),
objectMapper, objectMapper,
@ -359,6 +364,7 @@ public class CompactionTaskTest
SEGMENTS, SEGMENTS,
null, null,
null, null,
null,
createTuningConfig(), createTuningConfig(),
ImmutableMap.of("testKey", "testContext"), ImmutableMap.of("testKey", "testContext"),
objectMapper, objectMapper,
@ -373,19 +379,21 @@ public class CompactionTaskTest
Assert.assertEquals(task.getInterval(), fromJson.getInterval()); Assert.assertEquals(task.getInterval(), fromJson.getInterval());
Assert.assertEquals(task.getSegments(), fromJson.getSegments()); Assert.assertEquals(task.getSegments(), fromJson.getSegments());
Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec()); Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec());
Assert.assertEquals(task.isKeepSegmentGranularity(), fromJson.isKeepSegmentGranularity());
Assert.assertEquals(task.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(task.getContext(), fromJson.getContext()); Assert.assertEquals(task.getContext(), fromJson.getContext());
} }
@Test @Test
public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
{ {
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -393,6 +401,12 @@ public class CompactionTaskTest
); );
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
} else { } else {
@ -402,14 +416,39 @@ public class CompactionTaskTest
} }
@Test @Test
public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException
{ {
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
5,
500000,
1000000L,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
null,
100L,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, tuningConfig),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -417,11 +456,142 @@ public class CompactionTaskTest
); );
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig
);
}
}
@Test
public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
500000,
1000000L,
5L,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
null,
100L,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, tuningConfig),
null,
keepSegmentGranularity,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
keepSegmentGranularity
);
if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig
);
}
}
@Test
public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
500000,
1000000L,
null,
null,
3,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
null,
100L,
null,
null,
null,
null
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, tuningConfig),
null,
keepSegmentGranularity,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
keepSegmentGranularity
);
if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig
);
} }
} }
@ -458,13 +628,19 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, TUNING_CONFIG),
customSpec, customSpec,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(5, ingestionSpecs.size());
final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5); final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec)); IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
@ -489,9 +665,9 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(SEGMENTS), new SegmentProvider(SEGMENTS),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -499,6 +675,12 @@ public class CompactionTaskTest
); );
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
} else { } else {
@ -518,9 +700,9 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema( CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(segments), new SegmentProvider(segments),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
} }
@ -537,9 +719,9 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema( CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(segments), new SegmentProvider(segments),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
TUNING_CONFIG,
objectMapper objectMapper
); );
} }
@ -560,6 +742,7 @@ public class CompactionTaskTest
null, null,
null, null,
null, null,
null,
objectMapper, objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(), new NoopChatHandlerProvider(),
@ -567,6 +750,46 @@ public class CompactionTaskTest
); );
} }
@Test
public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
5,
500000,
1000000L,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
null,
100L,
null,
null,
null,
null
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be used with");
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(5L, tuningConfig),
null,
keepSegmentGranularity,
objectMapper
);
}
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity) private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
{ {
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
@ -617,6 +840,45 @@ public class CompactionTaskTest
List<DimensionsSpec> expectedDimensionsSpecs, List<DimensionsSpec> expectedDimensionsSpecs,
List<Interval> expectedSegmentIntervals List<Interval> expectedSegmentIntervals
) )
{
assertIngestionSchema(
ingestionSchemas,
expectedDimensionsSpecs,
expectedSegmentIntervals,
new IndexTuningConfig(
41943040, // automatically computed targetPartitionSize
500000,
1000000L,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
null,
100L,
null,
null,
null,
null
)
);
}
private static void assertIngestionSchema(
List<IndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<Interval> expectedSegmentIntervals,
IndexTuningConfig expectedTuningConfig
)
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
ingestionSchemas.size() == expectedDimensionsSpecs.size(), ingestionSchemas.size() == expectedDimensionsSpecs.size(),
@ -677,7 +939,7 @@ public class CompactionTaskTest
); );
// assert tuningConfig // assert tuningConfig
Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig()); Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
} }
} }
@ -841,17 +1103,74 @@ public class CompactionTaskTest
private static Column createColumn(DimensionSchema dimensionSchema) private static Column createColumn(DimensionSchema dimensionSchema)
{ {
return new ColumnBuilder() return new TestColumn(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()));
.setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()))
.setDictionaryEncodedColumn(() -> null)
.setBitmapIndex(() -> null)
.build();
} }
private static Column createColumn(AggregatorFactory aggregatorFactory) private static Column createColumn(AggregatorFactory aggregatorFactory)
{ {
return new ColumnBuilder() return new TestColumn(ValueType.fromString(aggregatorFactory.getTypeName()));
.setType(ValueType.fromString(aggregatorFactory.getTypeName())) }
.build();
private static class TestColumn implements Column
{
private final ColumnCapabilities columnCapabilities;
TestColumn(ValueType type)
{
columnCapabilities = new ColumnCapabilitiesImpl()
.setType(type)
.setDictionaryEncoded(type == ValueType.STRING) // set a fake value to make string columns
.setHasBitmapIndexes(type == ValueType.STRING)
.setHasSpatialIndexes(false)
.setHasMultipleValues(false);
}
@Override
public ColumnCapabilities getCapabilities()
{
return columnCapabilities;
}
@Override
public int getLength()
{
return NUM_ROWS_PER_SEGMENT;
}
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{
return null;
}
@Override
public GenericColumn getGenericColumn()
{
return null;
}
@Override
public ComplexColumn getComplexColumn()
{
return null;
}
@Override
public BitmapIndex getBitmapIndex()
{
return null;
}
@Override
public SpatialIndex getSpatialIndex()
{
return null;
}
@Override
public SettableColumnValueSelector makeSettableColumnValueSelector()
{
return null;
}
} }
} }

View File

@ -106,14 +106,14 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class IndexTask.IndexTuningConfig.class
); );
Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs()); Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs());
Assert.assertEquals(false, tuningConfig.isReportParseExceptions()); Assert.assertFalse(tuningConfig.isReportParseExceptions());
Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec()); Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod()); Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod());
Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory()); Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertNull(tuningConfig.getNumShards());
Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize()); Assert.assertNull(tuningConfig.getTargetPartitionSize());
} }
@Test @Test
@ -125,14 +125,14 @@ public class TaskSerdeTest
); );
Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertNull(tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue( tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\", \"numShards\":10}", "{\"type\":\"index\", \"numShards\":10}",
IndexTask.IndexTuningConfig.class IndexTask.IndexTuningConfig.class
); );
Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); Assert.assertNull(tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards()); Assert.assertEquals(10, (int) tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue( tuningConfig = jsonMapper.readValue(
@ -140,7 +140,7 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class IndexTask.IndexTuningConfig.class
); );
Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); Assert.assertNull(tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards()); Assert.assertEquals(10, (int) tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue( tuningConfig = jsonMapper.readValue(
@ -148,7 +148,7 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class IndexTask.IndexTuningConfig.class
); );
Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertNull(tuningConfig.getNumShards());
Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
tuningConfig = jsonMapper.readValue( tuningConfig = jsonMapper.readValue(
@ -156,11 +156,8 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class IndexTask.IndexTuningConfig.class
); );
Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertNull(tuningConfig.getNumShards());
Assert.assertEquals( Assert.assertNull(tuningConfig.getTargetPartitionSize());
IndexTask.IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE,
(int) tuningConfig.getTargetPartitionSize()
);
} }
@Test @Test

View File

@ -31,9 +31,10 @@ import java.util.Objects;
public class DataSourceCompactionConfig public class DataSourceCompactionConfig
{ {
public static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 1024; // 400MB
// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25; private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
private static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 1024; // 400MB
private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150; private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D"); private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");