Stateful auto compaction (#8573)

* Stateful auto compaction

* javaodc

* add removed test back

* fix test

* adding indexSpec to compactionState

* fix build

* add lastCompactionState

* address comments

* extract CompactionState

* fix doc

* fix build and test

* Add a task context to store compaction state; add javadoc

* fix it test
This commit is contained in:
Jihoon Son 2019-10-15 22:57:42 -07:00 committed by GitHub
parent 1a78a0c98a
commit 4046c86d62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 764 additions and 740 deletions

View File

@ -103,6 +103,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -177,7 +178,7 @@ public class CachingClusteredClientBenchmark
new Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), JSON_MAPPER)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
@ -60,7 +61,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
@Param("100")
private int numDataSources;
@ -72,7 +73,7 @@ public class NewestSegmentFirstPolicyBenchmark
private int numPartitionsPerDayInterval;
@Param("800000000")
private long targetCompactionSizeBytes;
private long inputSegmentSizeBytes;
@Param("1000000")
private long segmentSizeBytes;
@ -94,8 +95,7 @@ public class NewestSegmentFirstPolicyBenchmark
new DataSourceCompactionConfig(
dataSource,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
inputSegmentSizeBytes,
null,
null,
null,

View File

@ -30,11 +30,15 @@ import java.util.Objects;
*/
public class DynamicPartitionsSpec implements PartitionsSpec
{
/**
* Default maxTotalRows for most task types except compaction task.
*/
public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
static final String NAME = "dynamic";
private final int maxRowsPerSegment;
private final long maxTotalRows;
@Nullable
private final Long maxTotalRows;
@JsonCreator
public DynamicPartitionsSpec(
@ -45,7 +49,7 @@ public class DynamicPartitionsSpec implements PartitionsSpec
this.maxRowsPerSegment = PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)
? DEFAULT_MAX_ROWS_PER_SEGMENT
: maxRowsPerSegment;
this.maxTotalRows = PartitionsSpec.isEffectivelyNull(maxTotalRows) ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
this.maxTotalRows = maxTotalRows;
}
@Override
@ -55,12 +59,22 @@ public class DynamicPartitionsSpec implements PartitionsSpec
return maxRowsPerSegment;
}
@Nullable
@JsonProperty
public long getMaxTotalRows()
public Long getMaxTotalRows()
{
return maxTotalRows;
}
/**
* Get the given maxTotalRows or the default.
* The default can be different depending on the caller.
*/
public long getMaxTotalRowsOr(long defaultMaxTotalRows)
{
return PartitionsSpec.isEffectivelyNull(maxTotalRows) ? defaultMaxTotalRows : maxTotalRows;
}
@Override
public boolean needsDeterminePartitions(boolean useForHadoopTask)
{
@ -78,7 +92,7 @@ public class DynamicPartitionsSpec implements PartitionsSpec
}
DynamicPartitionsSpec that = (DynamicPartitionsSpec) o;
return maxRowsPerSegment == that.maxRowsPerSegment &&
maxTotalRows == that.maxTotalRows;
Objects.equals(maxTotalRows, that.maxTotalRows);
}
@Override

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import java.util.Map;
import java.util.Objects;
/**
* This class describes what compaction task spec was used to create a given segment.
* The compaction task is a task that reads Druid segments and overwrites them with new ones. Since this task always
* reads segments in the same order, the same task spec will always create the same set of segments
* (not same segment ID, but same content).
*
* Note that this class doesn't include all fields in the compaction task spec. Only the configurations that can
* affect the content of segment should be included.
*
* @see DataSegment#lastCompactionState
*/
public class CompactionState
{
private final PartitionsSpec partitionsSpec;
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;
@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@JsonProperty("indexSpec") Map<String, Object> indexSpec
)
{
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
@JsonProperty
public Map<String, Object> getIndexSpec()
{
return indexSpec;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(indexSpec, that.indexSpec);
}
@Override
public int hashCode()
{
return Objects.hash(partitionsSpec, indexSpec);
}
@Override
public String toString()
{
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
'}';
}
}

View File

@ -36,15 +36,12 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.jackson.CommaListJoinSerializer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -64,21 +61,23 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
*/
/**
* This class is needed for optional injection of pruneLoadSpec, see
* This class is needed for optional injection of pruneLoadSpec and pruneLastCompactionState, see
* github.com/google/guice/wiki/FrequentlyAskedQuestions#how-can-i-inject-optional-parameters-into-a-constructor
*/
@VisibleForTesting
public static class PruneLoadSpecHolder
public static class PruneSpecsHolder
{
@VisibleForTesting
public static final PruneLoadSpecHolder DEFAULT = new PruneLoadSpecHolder();
public static final PruneSpecsHolder DEFAULT = new PruneSpecsHolder();
@Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false;
@Inject(optional = true) @PruneLastCompactionState boolean pruneLastCompactionState = false;
}
private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> DIMENSIONS_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> METRICS_INTERNER = Interners.newWeakInterner();
private static final Interner<CompactionState> COMPACTION_STATE_INTERNER = Interners.newWeakInterner();
private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space",
""
@ -91,6 +90,16 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
private final List<String> dimensions;
private final List<String> metrics;
private final ShardSpec shardSpec;
/**
* Stores some configurations of the compaction task which created this segment.
* This field is filled in the metadata store only when "storeCompactionState" is set true in the context of the
* compaction task which is false by default.
* Also, this field can be pruned in many Druid modules when this class is loaded from the metadata store.
* See {@link PruneLastCompactionState} for details.
*/
@Nullable
private final CompactionState lastCompactionState;
private final long size;
public DataSegment(
@ -99,6 +108,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
List<String> dimensions,
List<String> metrics,
ShardSpec shardSpec,
CompactionState lastCompactionState,
Integer binaryVersion,
long size
)
@ -111,6 +121,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size
);
@ -136,9 +147,37 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
dimensions,
metrics,
shardSpec,
null,
binaryVersion,
size
);
}
public DataSegment(
String dataSource,
Interval interval,
String version,
Map<String, Object> loadSpec,
List<String> dimensions,
List<String> metrics,
ShardSpec shardSpec,
CompactionState lastCompactionState,
Integer binaryVersion,
long size
)
{
this(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size,
PruneLoadSpecHolder.DEFAULT
PruneSpecsHolder.DEFAULT
);
}
@ -158,18 +197,22 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JacksonInject PruneLoadSpecHolder pruneLoadSpecHolder
@JacksonInject PruneSpecsHolder pruneSpecsHolder
)
{
this.id = SegmentId.of(dataSource, interval, version, shardSpec);
this.loadSpec = pruneLoadSpecHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec);
this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec);
// Deduplicating dimensions and metrics lists as a whole because they are very likely the same for the same
// dataSource
this.dimensions = prepareDimensionsOrMetrics(dimensions, DIMENSIONS_INTERNER);
this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER);
this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : shardSpec;
this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState
? null
: prepareCompactionState(lastCompactionState);
this.binaryVersion = binaryVersion;
this.size = size;
}
@ -188,6 +231,15 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
return result;
}
@Nullable
private CompactionState prepareCompactionState(@Nullable CompactionState lastCompactionState)
{
if (lastCompactionState == null) {
return null;
}
return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
}
private List<String> prepareDimensionsOrMetrics(@Nullable List<String> list, Interner<List<String>> interner)
{
if (list == null) {
@ -256,6 +308,13 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
return shardSpec;
}
@Nullable
@JsonProperty
public CompactionState getLastCompactionState()
{
return lastCompactionState;
}
@JsonProperty
public Integer getBinaryVersion()
{
@ -390,33 +449,11 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
", dimensions=" + dimensions +
", metrics=" + metrics +
", shardSpec=" + shardSpec +
", lastCompactionState=" + lastCompactionState +
", size=" + size +
'}';
}
public static Comparator<DataSegment> bucketMonthComparator()
{
return new Comparator<DataSegment>()
{
@Override
public int compare(DataSegment lhs, DataSegment rhs)
{
int retVal;
DateTime lhsMonth = Granularities.MONTH.bucketStart(lhs.getInterval().getStart());
DateTime rhsMonth = Granularities.MONTH.bucketStart(rhs.getInterval().getStart());
retVal = lhsMonth.compareTo(rhsMonth);
if (retVal != 0) {
return retVal;
}
return lhs.compareTo(rhs);
}
};
}
public static Builder builder()
{
return new Builder();
@ -436,6 +473,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
private List<String> dimensions;
private List<String> metrics;
private ShardSpec shardSpec;
private CompactionState lastCompactionState;
private Integer binaryVersion;
private long size;
@ -457,6 +495,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
this.dimensions = segment.getDimensions();
this.metrics = segment.getMetrics();
this.shardSpec = segment.getShardSpec();
this.lastCompactionState = segment.getLastCompactionState();
this.binaryVersion = segment.getBinaryVersion();
this.size = segment.getSize();
}
@ -503,6 +542,12 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
return this;
}
public Builder lastCompactionState(CompactionState compactionState)
{
this.lastCompactionState = compactionState;
return this;
}
public Builder binaryVersion(Integer binaryVersion)
{
this.binaryVersion = binaryVersion;
@ -531,6 +576,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size
);

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* This annnotation is used to inject a boolean parameter into a {@link DataSegment} constructor, which prescribes to
* drop deserialized "lastCompactionState" and don't store it in a field of a {@link DataSegment}.
* "lastCompactionState" is used only on the coordinator, peons, and indexers.
*
* - In auto compaction of the coordinator, "lastCompactionState" is used to determine whether the given
* segment needs further compaction or not.
* - In parallel indexing, "lastCompactionState" should be serialized and deserialized properly when
* the sub tasks report the pushed segments to the supervisor task.
*/
@Target({ElementType.PARAMETER, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface PruneLastCompactionState
{
}

View File

@ -26,9 +26,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.RangeSet;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -39,13 +41,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*/
@ -106,7 +104,7 @@ public class DataSegmentTest
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT);
MAPPER.setInjectableValues(injectableValues);
}
@ -125,6 +123,10 @@ public class DataSegmentTest
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
ImmutableMap.of()
),
TEST_VERSION,
1
);
@ -134,7 +136,7 @@ public class DataSegmentTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(10, objectMap.size());
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
@ -229,35 +231,6 @@ public class DataSegmentTest
Assert.assertEquals("empty metrics", ImmutableList.of(), segment2.getMetrics());
}
@Test
public void testBucketMonthComparator()
{
DataSegment[] sortedOrder = {
makeDataSegment("test1", "2011-01-01/2011-01-02", "a"),
makeDataSegment("test1", "2011-01-02/2011-01-03", "a"),
makeDataSegment("test1", "2011-01-02/2011-01-03", "b"),
makeDataSegment("test2", "2011-01-01/2011-01-02", "a"),
makeDataSegment("test2", "2011-01-02/2011-01-03", "a"),
makeDataSegment("test1", "2011-02-01/2011-02-02", "a"),
makeDataSegment("test1", "2011-02-02/2011-02-03", "a"),
makeDataSegment("test1", "2011-02-02/2011-02-03", "b"),
makeDataSegment("test2", "2011-02-01/2011-02-02", "a"),
makeDataSegment("test2", "2011-02-02/2011-02-03", "a"),
};
List<DataSegment> shuffled = new ArrayList<>(Arrays.asList(sortedOrder));
Collections.shuffle(shuffled);
Set<DataSegment> theSet = new TreeSet<>(DataSegment.bucketMonthComparator());
theSet.addAll(shuffled);
int index = 0;
for (DataSegment dataSegment : theSet) {
Assert.assertEquals(sortedOrder[index], dataSegment);
++index;
}
}
private DataSegment makeDataSegment(String dataSource, String interval, String version)
{
return DataSegment.builder()

View File

@ -29,6 +29,7 @@ import org.apache.druid.TestObjectMapper;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@ -50,7 +51,7 @@ public class SegmentWithOvershadowedStatusTest
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT);
MAPPER.setInjectableValues(injectableValues);
}
@ -68,6 +69,7 @@ public class SegmentWithOvershadowedStatusTest
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
null,
TEST_VERSION,
1
);
@ -79,7 +81,7 @@ public class SegmentWithOvershadowedStatusTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
@ -133,6 +135,7 @@ class TestSegmentWithOvershadowedStatus extends DataSegment
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
@ -146,6 +149,7 @@ class TestSegmentWithOvershadowedStatus extends DataSegment
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size
);

View File

@ -786,8 +786,7 @@ A description of the compaction config is:
|`dataSource`|dataSource name to be compacted.|yes|
|`taskPriority`|[Priority](../ingestion/tasks.html#priority) of compaction task.|no (default = 25)|
|`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 12GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)|
|`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)|
|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no|
|`maxRowsPerSegment`|Max number of rows per segment after compaction.|no|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#context) for compaction tasks.|no|

View File

@ -63,11 +63,11 @@ To ensure an even distribution of segments across Historical processes in the cl
### Compacting Segments
Each run, the Druid Coordinator compacts small segments abutting each other. This is useful when you have a lot of small
segments which may degrade query performance as well as increase disk space usage. See [Segment Size Optimization](../operations/segment-optimization.md) for details.
Each run, the Druid Coordinator compacts segments by merging small segments or splitting a large one. This is useful when your segments are not optimized
in terms of segment size which may degrade query performance. See [Segment Size Optimization](../operations/segment-optimization.md) for details.
The Coordinator first finds the segments to compact together based on the [segment search policy](#segment-search-policy).
Once some segments are found, it launches a [compaction task](../ingestion/tasks.md#compact) to compact those segments.
The Coordinator first finds the segments to compact based on the [segment search policy](#segment-search-policy).
Once some segments are found, it issues a [compaction task](../ingestion/tasks.md#compact) to compact those segments.
The maximum number of running compaction tasks is `min(sum of worker capacity * slotRatio, maxSlots)`.
Note that even though `min(sum of worker capacity * slotRatio, maxSlots)` = 0, at least one compaction task is always submitted
if the compaction is enabled for a dataSource.
@ -76,30 +76,41 @@ See [Compaction Configuration API](../operations/api-reference.html#compaction-c
Compaction tasks might fail due to the following reasons.
- If the input segments of a compaction task are removed or overshadowed before it starts, that compaction task fails immediately.
- If a task of a higher priority acquires a lock for an interval overlapping with the interval of a compaction task, the compaction task fails.
- If a task of a higher priority acquires a [time chunk lock](../ingestion/tasks.html#locking) for an interval overlapping with the interval of a compaction task, the compaction task fails.
Once a compaction task fails, the Coordinator simply finds the segments for the interval of the failed task again, and launches a new compaction task in the next run.
Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run.
### Segment search policy
#### Newest segment first policy
#### Recent segment first policy
At every coordinator run, this policy searches for segments to compact by iterating segments from the latest to the oldest.
Once it finds the latest segment among all dataSources, it checks if the segment is _compactable_ with other segments of the same dataSource which have the same or abutting intervals.
Note that segments are compactable if their total size is smaller than or equal to the configured `inputSegmentSizeBytes`.
At every coordinator run, this policy looks up time chunks in order of newest-to-oldest and checks whether the segments in those time chunks
need compaction or not.
A set of segments need compaction if all conditions below are satisfied.
Here are some details with an example. Let us assume we have two dataSources (`foo`, `bar`)
and 5 segments (`foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION`, `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION`, `bar_2017-08-01T00:00:00.000Z_2017-09-01T00:00:00.000Z_VERSION`, `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`, `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION`).
When each segment has the same size of 10 MB and `inputSegmentSizeBytes` is 20 MB, this policy first returns two segments (`foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION`) to compact together because
`foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION` is the latest segment and `foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` abuts to it.
1) Total size of segments in the time chunk is smaller than or equal to the configured `inputSegmentSizeBytes`.
2) Segments have never been compacted yet or compaction spec has been updated since the last compaction, especially `maxRowsPerSegment`, `maxTotalRows`, and `indexSpec`.
If the coordinator has enough task slots for compaction, this policy would continue searching for the next segments and return
`bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`.
Note that `bar_2017-08-01T00:00:00.000Z_2017-09-01T00:00:00.000Z_VERSION` is not compacted together even though it abuts to `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`.
This is because the total segment size to compact would be greater than `inputSegmentSizeBytes` if it's included.
Here are some details with an example. Suppose we have two dataSources (`foo`, `bar`) as seen below:
- `foo`
- `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION`
- `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION_1`
- `foo_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`
- `bar`
- `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION`
- `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION_1`
Assuming that each segment is 10 MB and haven't been compacted yet, this policy first returns two segments of
`foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION` and `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION_1` to compact together because
`2017-11-01T00:00:00.000Z/2017-12-01T00:00:00.000Z` is the most recent time chunk.
If the coordinator has enough task slots for compaction, this policy will continue searching for the next segments and return
`bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION_1`.
Finally, `foo_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION` will be picked up even though there is only one segment in the time chunk of `2017-09-01T00:00:00.000Z/2017-10-01T00:00:00.000Z`.
The search start point can be changed by setting [skipOffsetFromLatest](../configuration/index.html#compaction-dynamic-configuration).
If this is set, this policy will ignore the segments falling into the interval of (the end time of the very latest segment - `skipOffsetFromLatest`).
If this is set, this policy will ignore the segments falling into the time chunk of (the end time of the most recent segment - `skipOffsetFromLatest`).
This is to avoid conflicts between compaction tasks and realtime tasks.
Note that realtime tasks have a higher priority than compaction tasks by default. Realtime tasks will revoke the locks of compaction tasks if their intervals overlap, resulting in the termination of the compaction task.

View File

@ -102,7 +102,6 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"ioConfig": <IO config>,
"dimensions" <custom dimensionsSpec>,
"segmentGranularity": <segment granularity after compaction>,
"targetCompactionSizeBytes": <target size of compacted segments>
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
@ -117,7 +116,6 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
|`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No|
|`targetCompactionSizeBytes`|Target segment size after compaction. Cannot be used with `maxRowsPerSegment`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No|
|`context`|[Task context](../ingestion/tasks.md#context)|No|

View File

@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.conf.Configuration;
@ -79,7 +80,7 @@ public class HdfsDataSegmentPusherTest
objectMapper = new TestObjectMapper();
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(ObjectMapper.class, objectMapper);
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT);
objectMapper.setInjectableValues(injectableValues);
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -63,7 +62,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS, config.getMaxTotalRows().longValue());
Assert.assertNull(config.getMaxTotalRows());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@ -250,28 +249,4 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config)
{
return new KafkaIndexTaskTuningConfig(
config.getMaxRowsInMemory(),
config.getMaxBytesInMemory(),
config.getMaxRowsPerSegment(),
config.getMaxTotalRows(),
config.getIntermediatePersistPeriod(),
config.getBasePersistDirectory(),
0,
config.getIndexSpec(),
config.getIndexSpecForIntermediatePersists(),
true,
config.isReportParseExceptions(),
config.getHandoffConditionTimeout(),
config.isResetOffsetAutomatically(),
config.getSegmentWriteOutMediumFactory(),
config.getIntermediateHandoffPeriod(),
config.isLogParseExceptions(),
config.getMaxParseExceptions(),
config.getMaxSavedParseExceptions()
);
}
}

View File

@ -53,6 +53,7 @@ import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
@ -82,7 +83,7 @@ public class BatchDeltaIngestionTest
MAPPER.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, MAPPER)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT);
MAPPER.setInjectableValues(inject);
INDEX_IO = HadoopDruidIndexerConfig.INDEX_IO;

View File

@ -37,6 +37,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
@ -65,7 +66,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
jsonMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
}

View File

@ -163,6 +163,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return partitionsSpec.getMaxTotalRows();
}
@Override
public DynamicPartitionsSpec getPartitionsSpec()
{
return partitionsSpec;

View File

@ -40,6 +40,7 @@ import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -370,7 +371,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
if (addResult.isOk()) {
final boolean isPushRequired = addResult.isPushRequired(
tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
tuningConfig.getPartitionsSpec().getMaxTotalRows()
tuningConfig.getPartitionsSpec().getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired) {
publishSegments(driver, publisher, committerSupplier, sequenceName);

View File

@ -38,7 +38,8 @@ public final class BatchAppenderators
FireDepartmentMetrics metrics,
TaskToolbox toolbox,
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig
AppenderatorConfig appenderatorConfig,
boolean storeCompactionState
)
{
return newAppenderator(
@ -48,7 +49,8 @@ public final class BatchAppenderators
toolbox,
dataSchema,
appenderatorConfig,
toolbox.getSegmentPusher()
toolbox.getSegmentPusher(),
storeCompactionState
);
}
@ -59,13 +61,15 @@ public final class BatchAppenderators
TaskToolbox toolbox,
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
DataSegmentPusher segmentPusher
DataSegmentPusher segmentPusher,
boolean storeCompactionState
)
{
return appenderatorsManager.createOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
storeCompactionState,
metrics,
segmentPusher,
toolbox.getObjectMapper(),

View File

@ -45,7 +45,7 @@ import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
@ -58,7 +58,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
@ -80,7 +79,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
@ -131,8 +129,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
@Nullable
private final IndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper;
@JsonIgnore
@ -184,7 +180,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable final Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") @Nullable final IndexTuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@ -220,11 +215,10 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.partitionConfigurationManager = new PartitionConfigurationManager(tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@ -262,13 +256,6 @@ public class CompactionTask extends AbstractBatchIndexTask
return segmentGranularity;
}
@Nullable
@JsonProperty
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@Nullable
@JsonProperty
public IndexTuningConfig getTuningConfig()
@ -437,9 +424,7 @@ public class CompactionTask extends AbstractBatchIndexTask
toolbox.getIndexIO()
);
final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(
queryableIndexAndSegments
);
final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (segmentGranularity == null) {
// original granularity
@ -801,113 +786,32 @@ public class CompactionTask extends AbstractBatchIndexTask
@VisibleForTesting
static class PartitionConfigurationManager
{
@Nullable
private final Long targetCompactionSizeBytes;
@Nullable
private final IndexTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, @Nullable IndexTuningConfig tuningConfig)
PartitionConfigurationManager(@Nullable IndexTuningConfig tuningConfig)
{
this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig);
this.tuningConfig = tuningConfig;
}
@Nullable
IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments)
IndexTuningConfig computeTuningConfig()
{
if (!hasPartitionConfig(tuningConfig)) {
final long nonNullTargetCompactionSizeBytes = Preconditions.checkNotNull(
targetCompactionSizeBytes,
"targetCompactionSizeBytes"
IndexTuningConfig newTuningConfig = tuningConfig == null
? IndexTuningConfig.createDefault()
: tuningConfig;
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
partitionsSpec = new DynamicPartitionsSpec(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
// Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment.
// If this is set to something too small, compactionTask can generate small segments
// which need to be compacted again, which in turn making auto compaction stuck in the same interval.
dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE)
);
// Find IndexTuningConfig.maxRowsPerSegment which is the number of rows per segment.
// Assume that the segment size is proportional to the number of rows. We can improve this later.
final long totalNumRows = queryableIndexAndSegments
.stream()
.mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.lhs.getNumRows())
.sum();
final long totalSizeBytes = queryableIndexAndSegments
.stream()
.mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.rhs.getSize())
.sum();
if (totalSizeBytes == 0L) {
throw new ISE("Total input segment size is 0 byte");
}
final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
final long maxRowsPerSegmentLong = Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes);
final int maxRowsPerSegment = Numbers.toIntExact(
maxRowsPerSegmentLong,
StringUtils.format(
"Estimated maxRowsPerSegment[%s] is out of integer value range. "
+ "Please consider reducing targetCompactionSizeBytes[%s].",
maxRowsPerSegmentLong,
targetCompactionSizeBytes
)
);
Preconditions.checkState(maxRowsPerSegment > 0, "Negative maxRowsPerSegment[%s]", maxRowsPerSegment);
log.info(
"Estimated maxRowsPerSegment[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]",
maxRowsPerSegment,
avgRowsPerByte,
nonNullTargetCompactionSizeBytes
);
// Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment.
// If this is set to something too small, compactionTask can generate small segments
// which need to be compacted again, which in turn making auto compaction stuck in the same interval.
final IndexTuningConfig newTuningConfig = tuningConfig == null
? IndexTuningConfig.createDefault()
: tuningConfig;
if (newTuningConfig.isForceGuaranteedRollup()) {
return newTuningConfig.withPartitionsSpec(new HashedPartitionsSpec(maxRowsPerSegment, null, null));
} else {
return newTuningConfig.withPartitionsSpec(new DynamicPartitionsSpec(maxRowsPerSegment, Long.MAX_VALUE));
}
} else {
return tuningConfig;
}
}
/**
* Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that
* targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#getPartitionsSpec} together.
* {@link #hasPartitionConfig} checks one of those configs is set.
* <p>
* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
* returns true. If targetCompactionSizeBytes is not set, this returns null or
* {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of
* hasPartitionConfig.
*/
@Nullable
private static Long getValidTargetCompactionSizeBytes(
@Nullable Long targetCompactionSizeBytes,
@Nullable IndexTuningConfig tuningConfig
)
{
if (targetCompactionSizeBytes != null && tuningConfig != null) {
Preconditions.checkArgument(
!hasPartitionConfig(tuningConfig),
"targetCompactionSizeBytes[%s] cannot be used with partitionsSpec[%s]",
targetCompactionSizeBytes,
tuningConfig.getPartitionsSpec()
);
return targetCompactionSizeBytes;
} else {
return hasPartitionConfig(tuningConfig)
? null
: DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
}
}
private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig)
{
if (tuningConfig != null) {
return tuningConfig.getPartitionsSpec() != null;
} else {
return false;
}
return newTuningConfig.withPartitionsSpec(partitionsSpec);
}
}
@ -931,8 +835,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private Granularity segmentGranularity;
@Nullable
private Long targetCompactionSizeBytes;
@Nullable
private IndexTuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
@ -994,12 +896,6 @@ public class CompactionTask extends AbstractBatchIndexTask
return this;
}
public Builder targetCompactionSizeBytes(long targetCompactionSizeBytes)
{
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
return this;
}
public Builder tuningConfig(IndexTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
@ -1025,7 +921,6 @@ public class CompactionTask extends AbstractBatchIndexTask
dimensionsSpec,
metricsSpec,
segmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context,
jsonMapper,

View File

@ -128,7 +128,7 @@ public class FiniteFirehoseProcessor
if (dynamicPartitionsSpec != null) {
final boolean isPushRequired = addResult.isPushRequired(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
dynamicPartitionsSpec.getMaxTotalRows()
dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired) {
// There can be some segments waiting for being pushed even though no more rows will be added to them

View File

@ -873,7 +873,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
buildSegmentsFireDepartmentMetrics,
toolbox,
dataSchema,
tuningConfig
tuningConfig,
getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE)
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
@ -1321,6 +1322,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@JsonProperty
@Nullable
@Override
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;

View File

@ -37,10 +37,15 @@ public class Tasks
public static final int DEFAULT_TASK_PRIORITY = 0;
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
// This context is used in auto compaction. When it is set in the context, the segments created by the task
// will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
// See DataSegment and NewestSegmentFirstIterator for more details.
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
public static SortedSet<Interval> computeCompactIntervals(SortedSet<Interval> intervals)
{

View File

@ -288,7 +288,8 @@ public class PartialSegmentGenerateTask extends AbstractBatchIndexTask
toolbox,
dataSchema,
tuningConfig,
new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager())
new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()),
getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE)
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {

View File

@ -417,7 +417,8 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
fireDepartmentMetrics,
toolbox,
dataSchema,
tuningConfig
tuningConfig,
getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE)
);
boolean exceptionOccurred = false;
try (
@ -461,7 +462,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
if (addResult.isOk()) {
final boolean isPushRequired = addResult.isPushRequired(
partitionsSpec.getMaxRowsPerSegment(),
partitionsSpec.getMaxTotalRows()
partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired) {
// There can be some segments waiting for being published even though any rows won't be added to them.

View File

@ -44,6 +44,7 @@ import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
@ -638,7 +639,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
final boolean isPushRequired = addResult.isPushRequired(
tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
tuningConfig.getPartitionsSpec().getMaxTotalRows()
tuningConfig.getPartitionsSpec().getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired && !sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;

View File

@ -159,6 +159,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
return partitionsSpec.getMaxTotalRows();
}
@Override
public DynamicPartitionsSpec getPartitionsSpec()
{
return partitionsSpec;

View File

@ -45,7 +45,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import java.util.concurrent.TimeUnit;
@ -87,7 +87,7 @@ public class TestUtils
.addValue(AuthConfig.class, new AuthConfig())
.addValue(AuthorizerMapper.class, null)
.addValue(RowIngestionMetersFactory.class, rowIngestionMetersFactory)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
.addValue(IndexingServiceClient.class, new NoopIndexingServiceClient())
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, new TestAppenderatorsManager())

View File

@ -69,7 +69,6 @@ public class ClientCompactQuerySerdeTest
"testSha256OfSortedSegmentIds"
)
),
null,
new ClientCompactQueryTuningConfig(
100,
40000,
@ -100,7 +99,6 @@ public class ClientCompactQuerySerdeTest
query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(),
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
);
Assert.assertEquals(query.getTargetCompactionSizeBytes(), task.getTargetCompactionSizeBytes());
Assert.assertEquals(
query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory()
);

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
@ -30,6 +31,7 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@ -55,6 +57,7 @@ import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -111,6 +114,19 @@ public class CompactionTaskRunTest extends IngestionTestBase
false,
0
);
private static final CompactionState DEFAULT_COMPACTION_STATE = new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "concise"),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
);
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
@ -191,6 +207,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
@ -235,6 +252,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1),
@ -261,6 +279,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(
@ -364,6 +383,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1),
@ -408,6 +428,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState());
// hour segmentGranularity
final CompactionTask compactionTask2 = builder
@ -425,6 +446,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
}
}
@ -727,6 +749,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
if (task.isReady(box.getTaskActionClient())) {
if (readyLatchToCountDown != null) {
readyLatchToCountDown.countDown();

View File

@ -62,7 +62,6 @@ import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -418,7 +417,6 @@ public class CompactionTaskTest
Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig());
Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec());
Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes());
Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
Assert.assertEquals(expected.getContext(), actual.getContext());
}
@ -429,7 +427,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
@ -489,7 +487,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
@ -550,7 +548,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
@ -611,7 +609,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
@ -672,7 +670,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
customSpec,
null,
null,
@ -713,7 +711,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
customMetricsSpec,
null,
@ -747,7 +745,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
@ -787,7 +785,7 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
@ -810,7 +808,7 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
@ -844,59 +842,13 @@ public class CompactionTaskTest
.build();
}
@Test
public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
500000,
1000000L,
null,
null,
null,
null,
new HashedPartitionsSpec(6, null, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
5000,
true,
false,
null,
100L,
null,
null,
null,
null
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with");
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(6L, tuningConfig),
null,
null,
null,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
}
@Test
public void testSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new PeriodGranularity(Period.months(3), null, null),
@ -931,7 +883,7 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
@ -958,27 +910,6 @@ public class CompactionTaskTest
);
}
@Test
public void testHugeTargetCompactionSize()
{
final PartitionConfigurationManager manager = new PartitionConfigurationManager(Long.MAX_VALUE, TUNING_CONFIG);
final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
final Map<File, QueryableIndex> queryableIndexMap = indexIO.getQueryableIndexMap();
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<DataSegment, File> entry : SEGMENT_MAP.entrySet()) {
final DataSegment segment = entry.getKey();
final File file = entry.getValue();
segments.add(Pair.of(Preconditions.checkNotNull(queryableIndexMap.get(file)), segment));
}
expectedException.expect(ArithmeticException.class);
expectedException.expectMessage(
CoreMatchers.startsWith("Estimated maxRowsPerSegment[922337203685477632] is out of integer value range.")
);
manager.computeTuningConfig(segments);
}
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration()
{
return ImmutableList.of(
@ -1045,7 +976,7 @@ public class CompactionTaskTest
null,
null,
null,
new HashedPartitionsSpec(41943040, null, null), // automatically computed targetPartitionSize
new HashedPartitionsSpec(null, null, null), // automatically computed targetPartitionSize
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,

View File

@ -89,6 +89,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@ -99,6 +100,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
return Appenderators.createOffline(
schema,
config,
storeCompactionState,
metrics,
dataSegmentPusher,
objectMapper,

View File

@ -13,6 +13,6 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}"
"payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"lastCompactionState\":null,\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}"
}
]

View File

@ -36,7 +36,7 @@ import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.junit.Assert;
import java.util.HashMap;
@ -80,7 +80,7 @@ public class TestHelper
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), mapper)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
return mapper;
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
@ -34,8 +33,6 @@ public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final ClientCompactionIOConfig ioConfig;
@Nullable
private final Long targetCompactionSizeBytes;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> context;
@ -43,14 +40,12 @@ public class ClientCompactQuery implements ClientQuery
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.dataSource = dataSource;
this.ioConfig = ioConfig;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.context = context;
}
@ -75,13 +70,6 @@ public class ClientCompactQuery implements ClientQuery
return ioConfig;
}
@JsonProperty
@Nullable
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@JsonProperty
public ClientCompactQueryTuningConfig getTuningConfig()
{
@ -106,7 +94,6 @@ public class ClientCompactQuery implements ClientQuery
ClientCompactQuery that = (ClientCompactQuery) o;
return Objects.equals(dataSource, that.dataSource) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
}
@ -114,7 +101,7 @@ public class ClientCompactQuery implements ClientQuery
@Override
public int hashCode()
{
return Objects.hash(dataSource, ioConfig, targetCompactionSizeBytes, tuningConfig, context);
return Objects.hash(dataSource, ioConfig, tuningConfig, context);
}
@Override
@ -123,7 +110,6 @@ public class ClientCompactQuery implements ClientQuery
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", ioConfig=" + ioConfig +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", context=" + context +
'}';

View File

@ -116,6 +116,11 @@ public class ClientCompactQueryTuningConfig
return maxTotalRows;
}
public long getMaxTotalRowsOr(long defaultMaxTotalRows)
{
return maxTotalRows == null ? defaultMaxTotalRows : maxTotalRows;
}
@JsonProperty
@Nullable
public IndexSpec getIndexSpec()

View File

@ -73,13 +73,12 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
)
{
Preconditions.checkArgument(segments.size() > 1, "Expect two or more segments to compact");
Preconditions.checkArgument(!segments.isEmpty(), "Expect non-empty segments to compact");
final String dataSource = segments.get(0).getDataSource();
Preconditions.checkArgument(
@ -94,7 +93,6 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
new ClientCompactQuery(
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
targetCompactionSizeBytes,
tuningConfig,
context
)

View File

@ -37,7 +37,6 @@ public interface IndexingServiceClient
String compactSegments(
List<DataSegment> segments,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@ -225,6 +226,12 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
return maxPendingPersists;
}
@Override
public PartitionsSpec getPartitionsSpec()
{
throw new UnsupportedOperationException();
}
@JsonProperty
public ShardSpec getShardSpec()
{

View File

@ -27,7 +27,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.plumber.Plumber;
import java.io.IOException;
@ -88,11 +87,6 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
return tuningConfig;
}
public Plumber findPlumber()
{
return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics);
}
public Firehose connect() throws IOException
{
return ioConfig.getFirehoseFactory().connect(dataSchema.getParser(), null);

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@ -60,6 +61,8 @@ public interface AppenderatorConfig
throw new UnsupportedOperationException("maxTotalRows is not implemented.");
}
PartitionsSpec getPartitionsSpec();
/**
* Period that sets frequency to persist to local storage if no other thresholds are met
*/

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@ -38,8 +39,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@ -53,10 +52,8 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
@ -73,6 +70,7 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
@ -94,7 +92,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -113,6 +110,7 @@ public class AppenderatorImpl implements Appenderator
private final DataSchema schema;
private final AppenderatorConfig tuningConfig;
private final boolean storeCompactionState;
private final FireDepartmentMetrics metrics;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper objectMapper;
@ -154,50 +152,6 @@ public class AppenderatorImpl implements Appenderator
private volatile Throwable persistError;
AppenderatorImpl(
DataSchema schema,
AppenderatorConfig tuningConfig,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
IndexIO indexIO,
IndexMerger indexMerger,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this(
schema,
tuningConfig,
metrics,
dataSegmentPusher,
objectMapper,
segmentAnnouncer,
conglomerate == null ? null : new SinkQuerySegmentWalker(
schema.getDataSource(),
new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
),
objectMapper,
emitter,
conglomerate,
queryExecutorService,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats
),
indexIO,
indexMerger,
cache
);
log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
}
/**
* This constructor allows the caller to provide its own SinkQuerySegmentWalker.
*
@ -210,11 +164,12 @@ public class AppenderatorImpl implements Appenderator
AppenderatorImpl(
DataSchema schema,
AppenderatorConfig tuningConfig,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
DataSegmentAnnouncer segmentAnnouncer,
SinkQuerySegmentWalker sinkQuerySegmentWalker,
@Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
IndexIO indexIO,
IndexMerger indexMerger,
Cache cache
@ -222,6 +177,7 @@ public class AppenderatorImpl implements Appenderator
{
this.schema = Preconditions.checkNotNull(schema, "schema");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.storeCompactionState = storeCompactionState;
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
@ -243,7 +199,6 @@ public class AppenderatorImpl implements Appenderator
log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
}
@Override
public String getDataSource()
{
@ -429,10 +384,15 @@ public class AppenderatorImpl implements Appenderator
Sink retVal = sinks.get(identifier);
if (retVal == null) {
final Map<String, Object> indexSpecMap = objectMapper.convertValue(
tuningConfig.getIndexSpec(),
new TypeReference<Map<String, Object>>() {}
);
retVal = new Sink(
identifier.getInterval(),
schema,
identifier.getShardSpec(),
storeCompactionState ? new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap) : null,
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
@ -796,8 +756,7 @@ public class AppenderatorImpl implements Appenderator
// semantics.
() -> dataSegmentPusher.push(
mergedFile,
sink.getSegment()
.withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
useUniquePath
),
exception -> exception instanceof Exception,
@ -1104,6 +1063,7 @@ public class AppenderatorImpl implements Appenderator
identifier.getInterval(),
schema,
identifier.getShardSpec(),
null,
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
@ -32,6 +33,7 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.concurrent.ExecutorService;
@ -57,24 +59,34 @@ public class Appenderators
return new AppenderatorImpl(
schema,
config,
false,
metrics,
dataSegmentPusher,
objectMapper,
conglomerate,
segmentAnnouncer,
emitter,
queryExecutorService,
new SinkQuerySegmentWalker(
schema.getDataSource(),
new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
),
objectMapper,
emitter,
conglomerate,
queryExecutorService,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,
cachePopulatorStats
),
indexIO,
indexMerger,
cache,
cacheConfig,
cachePopulatorStats
cache
);
}
public static Appenderator createOffline(
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@ -85,10 +97,10 @@ public class Appenderators
return new AppenderatorImpl(
schema,
config,
storeCompactionState,
metrics,
dataSegmentPusher,
objectMapper,
null,
new DataSegmentAnnouncer()
{
@Override
@ -116,11 +128,8 @@ public class Appenderators
}
},
null,
null,
indexIO,
indexMerger,
null,
null,
null
);
}

View File

@ -86,6 +86,7 @@ public interface AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,

View File

@ -54,6 +54,15 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory
@Override
public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics)
{
return Appenderators.createOffline(schema, config, metrics, dataSegmentPusher, objectMapper, indexIO, indexMerger);
return Appenderators.createOffline(
schema,
config,
false,
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
indexMerger
);
}
}

View File

@ -78,6 +78,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
String taskId,
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,

View File

@ -106,6 +106,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@ -120,6 +121,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
batchAppenderator = Appenderators.createOffline(
schema,
config,
storeCompactionState,
metrics,
dataSegmentPusher,
objectMapper,

View File

@ -31,6 +31,7 @@ import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
@ -164,6 +165,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
Appenderator appenderator = new AppenderatorImpl(
schema,
rewriteAppenderatorConfigMemoryLimits(config),
false,
metrics,
dataSegmentPusher,
objectMapper,
@ -184,6 +186,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
boolean storeCompactionState,
FireDepartmentMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@ -194,14 +197,13 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
synchronized (this) {
DatasourceBundle datasourceBundle = datasourceBundles.computeIfAbsent(
schema.getDataSource(),
(datasource) -> {
return new DatasourceBundle(datasource);
}
DatasourceBundle::new
);
Appenderator appenderator = Appenderators.createOffline(
schema,
rewriteAppenderatorConfigMemoryLimits(config),
storeCompactionState,
metrics,
dataSegmentPusher,
objectMapper,
@ -397,6 +399,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
return baseConfig.getMaxTotalRows();
}
@Override
public PartitionsSpec getPartitionsSpec()
{
return baseConfig.getPartitionsSpec();
}
@Override
public Period getIntermediatePersistPeriod()
{
@ -471,7 +479,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
ListenableFuture<File> mergeFuture = mergeExecutor.submit(
new Callable<File>()
@ -511,7 +519,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
ListenableFuture<File> mergeFuture = mergeExecutor.submit(
new Callable<File>()
@ -550,7 +558,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}
@ -560,7 +568,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
File inDir,
File outDir,
IndexSpec indexSpec
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}
@ -572,7 +580,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}
@ -583,7 +591,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}
@ -596,7 +604,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
IndexSpec indexSpec,
ProgressIndicator progress,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}
@ -610,7 +618,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
IndexSpec indexSpec,
ProgressIndicator progress,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
)
{
throw new UOE(ERROR_MSG);
}

View File

@ -717,6 +717,7 @@ public class RealtimePlumber implements Plumber
sinkInterval,
schema,
config.getShardSpec(),
null,
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),

View File

@ -36,11 +36,13 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -62,6 +64,8 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
private final Interval interval;
private final DataSchema schema;
private final ShardSpec shardSpec;
@Nullable
private final CompactionState compactionState;
private final String version;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
@ -85,22 +89,51 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
String dedupColumn
)
{
this.schema = schema;
this.shardSpec = shardSpec;
this.interval = interval;
this.version = version;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.reportParseExceptions = reportParseExceptions;
this.dedupColumn = dedupColumn;
makeNewCurrIndex(interval.getStartMillis(), schema);
this(
interval,
schema,
shardSpec,
null,
version,
maxRowsInMemory,
maxBytesInMemory,
reportParseExceptions,
dedupColumn,
Collections.emptyList()
);
}
public Sink(
Interval interval,
DataSchema schema,
ShardSpec shardSpec,
@Nullable CompactionState compactionState,
String version,
int maxRowsInMemory,
long maxBytesInMemory,
boolean reportParseExceptions,
String dedupColumn
)
{
this(
interval,
schema,
shardSpec,
compactionState,
version,
maxRowsInMemory,
maxBytesInMemory,
reportParseExceptions,
dedupColumn,
Collections.emptyList()
);
}
public Sink(
Interval interval,
DataSchema schema,
ShardSpec shardSpec,
@Nullable CompactionState compactionState,
String version,
int maxRowsInMemory,
long maxBytesInMemory,
@ -111,6 +144,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
{
this.schema = schema;
this.shardSpec = shardSpec;
this.compactionState = compactionState;
this.interval = interval;
this.version = version;
this.maxRowsInMemory = maxRowsInMemory;
@ -244,6 +278,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
Collections.emptyList(),
Lists.transform(Arrays.asList(schema.getAggregators()), AggregatorFactory::getName),
shardSpec,
compactionState,
null,
0
);

View File

@ -43,8 +43,6 @@ public class DataSourceCompactionConfig
private final String dataSource;
private final int taskPriority;
private final long inputSegmentSizeBytes;
@Nullable
private final Long targetCompactionSizeBytes;
// The number of input segments is limited because the byte size of a serialized task spec is limited by
// RemoteTaskRunnerConfig.maxZnodeBytes.
@Nullable
@ -58,7 +56,6 @@ public class DataSourceCompactionConfig
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskPriority") @Nullable Integer taskPriority,
@JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig,
@ -72,62 +69,12 @@ public class DataSourceCompactionConfig
this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
: inputSegmentSizeBytes;
this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(
targetCompactionSizeBytes,
maxRowsPerSegment,
tuningConfig
);
this.maxRowsPerSegment = maxRowsPerSegment;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.taskContext = taskContext;
}
/**
* This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this
* method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}.
*
* Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
* the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}.
*/
@Nullable
private static Long getValidTargetCompactionSizeBytes(
@Nullable Long targetCompactionSizeBytes,
@Nullable Integer maxRowsPerSegment,
@Nullable UserCompactTuningConfig tuningConfig
)
{
if (targetCompactionSizeBytes != null) {
Preconditions.checkArgument(
!hasPartitionConfig(maxRowsPerSegment, tuningConfig),
"targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]",
targetCompactionSizeBytes,
maxRowsPerSegment,
tuningConfig == null ? null : tuningConfig.getMaxTotalRows()
);
return targetCompactionSizeBytes;
} else {
return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
}
}
/**
* his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are
* 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and
* 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by
* {@link UserCompactTuningConfig}.
*
* Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
* the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}.
*/
private static boolean hasPartitionConfig(
@Nullable Integer maxRowsPerSegment,
@Nullable UserCompactTuningConfig tuningConfig
)
{
return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null);
}
@JsonProperty
public String getDataSource()
{
@ -146,13 +93,6 @@ public class DataSourceCompactionConfig
return inputSegmentSizeBytes;
}
@JsonProperty
@Nullable
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@JsonProperty
@Nullable
public Integer getMaxRowsPerSegment()
@ -193,7 +133,6 @@ public class DataSourceCompactionConfig
return taskPriority == that.taskPriority &&
inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(taskContext, that.taskContext);
@ -206,7 +145,6 @@ public class DataSourceCompactionConfig
dataSource,
taskPriority,
inputSegmentSizeBytes,
targetCompactionSizeBytes,
skipOffsetFromLatest,
tuningConfig,
taskContext

View File

@ -167,7 +167,8 @@ public class DruidCoordinator
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector
@Coordinator DruidLeaderSelector coordLeaderSelector,
DruidCoordinatorSegmentCompactor segmentCompactor
)
{
this(
@ -188,7 +189,8 @@ public class DruidCoordinator
indexingServiceHelpers,
factory,
lookupCoordinatorManager,
coordLeaderSelector
coordLeaderSelector,
segmentCompactor
);
}
@ -210,7 +212,8 @@ public class DruidCoordinator
Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector
DruidLeaderSelector coordLeaderSelector,
DruidCoordinatorSegmentCompactor segmentCompactor
)
{
this.config = config;
@ -235,7 +238,7 @@ public class DruidCoordinator
this.lookupCoordinatorManager = lookupCoordinatorManager;
this.coordLeaderSelector = coordLeaderSelector;
this.segmentCompactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient);
this.segmentCompactor = segmentCompactor;
}
public boolean isLeader()

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.helper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
@ -37,6 +38,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -51,16 +53,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
// Should be synced with CompactionTask.TYPE
private static final String COMPACT_TASK_TYPE = "compact";
// Should be synced with Tasks.STORE_COMPACTION_STATE_KEY
private static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
private static final Logger LOG = new Logger(DruidCoordinatorSegmentCompactor.class);
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();
private final CompactionSegmentSearchPolicy policy;
private final IndexingServiceClient indexingServiceClient;
private Object2LongMap<String> remainingSegmentSizeBytes;
@Inject
public DruidCoordinatorSegmentCompactor(IndexingServiceClient indexingServiceClient)
public DruidCoordinatorSegmentCompactor(
ObjectMapper objectMapper,
IndexingServiceClient indexingServiceClient
)
{
this.policy = new NewestSegmentFirstPolicy(objectMapper);
this.indexingServiceClient = indexingServiceClient;
}
@ -158,33 +166,39 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
final List<DataSegment> segmentsToCompact = iterator.next();
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
if (segmentsToCompact.size() > 1) {
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
config.getTargetCompactionSizeBytes(),
config.getTaskPriority(),
ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
config.getTaskContext()
newAutoCompactionContext(config.getTaskContext())
);
LOG.info(
"Submitted a compactTask[%s] for segments %s",
taskId,
Iterables.transform(segmentsToCompact, DataSegment::getId)
);
} else if (segmentsToCompact.size() == 1) {
throw new ISE("Found one segments[%s] to compact", segmentsToCompact);
} else {
throw new ISE("Failed to find segments for dataSource[%s]", dataSourceName);
throw new ISE("segmentsToCompact is empty?");
}
}
return makeStats(numSubmittedTasks, iterator);
}
private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext)
{
final Map<String, Object> newContext = configuredContext == null
? new HashMap<>()
: new HashMap<>(configuredContext);
newContext.put(STORE_COMPACTION_STATE_KEY, true);
return newContext;
}
private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
{
final CoordinatorStats stats = new CoordinatorStats();

View File

@ -19,15 +19,20 @@
package org.apache.druid.server.coordinator.helper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -45,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -56,6 +62,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
private static final Logger log = new Logger(NewestSegmentFirstIterator.class);
private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
@ -69,11 +76,13 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
);
NewestSegmentFirstIterator(
ObjectMapper objectMapper,
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
Map<String, List<Interval>> skipIntervals
)
{
this.objectMapper = objectMapper;
this.compactionConfigs = compactionConfigs;
this.dataSources = dataSources;
this.timelineIterators = new HashMap<>(dataSources.size());
@ -84,7 +93,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
if (config != null && !timeline.isEmpty()) {
final List<Interval> searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
final List<Interval> searchIntervals = findInitialSearchInterval(
timeline,
config.getSkipOffsetFromLatest(),
skipIntervals.get(dataSource)
);
if (!searchIntervals.isEmpty()) {
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
}
@ -175,14 +188,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
config
);
if (segmentsToCompact.getNumSegments() > 1) {
if (!segmentsToCompact.isEmpty()) {
queue.add(new QueueEntry(segmentsToCompact.segments));
}
}
/**
* Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned,
* which means the holder always has at least two {@link DataSegment}s.
* which means the holder always has at least one {@link DataSegment}.
*/
private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
{
@ -201,7 +214,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.filter(holder -> {
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
return chunks.size() > 1
return !chunks.isEmpty()
&& partitionBytes > 0
&& interval.contains(chunks.get(0).getObject().getInterval());
})
@ -229,61 +242,115 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
}
}
private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates)
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final int maxRowsPerSegment = config.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: config.getMaxRowsPerSegment();
@Nullable Long maxTotalRows = config.getTuningConfig() == null
? null
: config.getTuningConfig().getMaxTotalRows();
maxTotalRows = maxTotalRows == null ? Long.MAX_VALUE : maxTotalRows;
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0));
return true;
}
final boolean allCandidatesHaveSameLastCompactionState = candidates
.segments
.stream()
.allMatch(segment -> lastCompactionState.equals(segment.getLastCompactionState()));
if (!allCandidatesHaveSameLastCompactionState) {
log.info("Candidates[%s] were compacted with different partitions spec. Needs compaction.", candidates.segments);
return true;
}
final PartitionsSpec segmentPartitionsSpec = lastCompactionState.getPartitionsSpec();
if (!(segmentPartitionsSpec instanceof DynamicPartitionsSpec)) {
log.info(
"Candidate segment[%s] was compacted with a non dynamic partitions spec. Needs compaction.",
candidates.segments.get(0)
);
return true;
}
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) segmentPartitionsSpec;
final IndexSpec segmentIndexSpec = objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class);
final IndexSpec configuredIndexSpec;
if (config.getTuningConfig() == null || config.getTuningConfig().getIndexSpec() == null) {
configuredIndexSpec = new IndexSpec();
} else {
configuredIndexSpec = config.getTuningConfig().getIndexSpec();
}
boolean needsCompaction = false;
if (!Objects.equals(maxRowsPerSegment, dynamicPartitionsSpec.getMaxRowsPerSegment())
|| !Objects.equals(maxTotalRows, dynamicPartitionsSpec.getMaxTotalRows())) {
log.info(
"Configured maxRowsPerSegment[%s] and maxTotalRows[%s] are differenet from "
+ "the partitionsSpec[%s] of segments. Needs compaction.",
maxRowsPerSegment,
maxTotalRows,
dynamicPartitionsSpec
);
needsCompaction = true;
}
// segmentIndexSpec cannot be null.
if (!segmentIndexSpec.equals(configuredIndexSpec)) {
log.info(
"Configured indexSpec[%s] is different from the one[%s] of segments. Needs compaction",
configuredIndexSpec,
segmentIndexSpec
);
needsCompaction = true;
}
return needsCompaction;
}
/**
* Find segments to compact together for the given intervalToSearch. It progressively searches the given
* intervalToSearch in time order (latest first). The timeline lookup duration is one day. It means, the timeline is
* looked up for the last one day of the given intervalToSearch, and the next day is searched again if the size of
* found segments are not enough to compact. This is repeated until enough amount of segments are found.
*
* @param compactibleTimelineObjectHolderCursor timeline iterator
* @param config compaction config
*
* @return segments to compact
*/
private static SegmentsToCompact findSegmentsToCompact(
private SegmentsToCompact findSegmentsToCompact(
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
{
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()) {
final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
targetCompactionSizeBytes,
candidates.segments
);
if (isCompactibleSize && needsCompaction) {
return candidates;
if (!candidates.isEmpty()) {
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
final boolean needsCompaction = needsCompaction(config, candidates);
if (isCompactibleSize && needsCompaction) {
return candidates;
} else {
if (!isCompactibleSize) {
log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.",
candidates.getTotalSize(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
inputSegmentSize
);
}
}
} else {
if (!isCompactibleSize) {
log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.",
candidates.getTotalSize(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
inputSegmentSize
);
}
if (!needsCompaction) {
log.warn(
"Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "
+ "for datasource[%s] and interval[%s]. Skipping compaction for this interval.",
candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()),
targetCompactionSizeBytes,
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval()
);
}
throw new ISE("No segment is found?");
}
}
// Return an empty set if nothing is found
log.info("All segments look good! Nothing to compact");
return new SegmentsToCompact();
}
@ -458,6 +525,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
}
private boolean isEmpty()
{
return segments.isEmpty();
}
private int getNumSegments()
{
return segments.size();

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.helper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -32,6 +33,13 @@ import java.util.Map;
*/
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{
private final ObjectMapper objectMapper;
public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
@Override
public CompactionSegmentIterator reset(
Map<String, DataSourceCompactionConfig> compactionConfigs,
@ -39,6 +47,6 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
Map<String, List<Interval>> skipIntervals
)
{
return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals);
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
}
}

View File

@ -20,38 +20,13 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
/**
* Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}.
*/
class SegmentCompactorUtil
{
/**
* The allowed error rate of the segment size after compaction.
* Its value is determined experimentally.
*/
private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2;
static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List<DataSegment> candidates)
{
if (targetCompactionSizeBytes == null) {
// If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not.
return true;
}
final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE);
final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE);
return candidates
.stream()
.filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold)
.count() > 1;
}
/**
* Removes {@code smallInterval} from {@code largeInterval}. The end of both intervals should be same.
*

View File

@ -1761,6 +1761,7 @@ public class CachingClusteredClientTest
null,
null,
new SingleDimensionShardSpec(dimension, start, end, partitionNum),
null,
9,
0L
);

View File

@ -28,7 +28,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.test.utils.ImmutableDruidDataSourceTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -48,7 +48,7 @@ public class ImmutableDruidDataSourceTest
final ImmutableDruidDataSource dataSource = getImmutableDruidDataSource(segment);
final ObjectMapper objectMapper = new DefaultObjectMapper()
.setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT));
.setInjectableValues(new Std().addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT));
final String json = objectMapper.writeValueAsString(dataSource);
ImmutableDruidDataSourceTestUtils.assertEquals(dataSource, objectMapper.readValue(json,
@ -84,16 +84,17 @@ public class ImmutableDruidDataSourceTest
private DataSegment getTestSegment()
{
return new DataSegment(
"test",
Intervals.of("2017/2018"),
"version",
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
null,
1,
100L,
PruneLoadSpecHolder.DEFAULT
"test",
Intervals.of("2017/2018"),
"version",
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
null,
null,
1,
100L,
PruneSpecsHolder.DEFAULT
);
}

View File

@ -47,7 +47,6 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -35,7 +35,7 @@ import java.io.IOException;
public class SegmentPublishResultTest
{
private final ObjectMapper objectMapper = new DefaultObjectMapper()
.setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT));
.setInjectableValues(new Std().addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT));
@Test
public void testSerdeOkResult() throws IOException

View File

@ -22,7 +22,7 @@ package org.apache.druid.server;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
public class ServerTestHelper
{
@ -32,7 +32,7 @@ public class ServerTestHelper
MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(ObjectMapper.class.getName(), MAPPER)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
}
}

View File

@ -65,7 +65,7 @@ public class SegmentChangeRequestDropTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("drop", objectMap.get("action"));
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));

View File

@ -64,7 +64,7 @@ public class SegmentChangeRequestLoadTest
mapper.writeValueAsString(segmentDrop), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("load", objectMap.get("action"));
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));

View File

@ -293,7 +293,7 @@ public class BatchDataSegmentAnnouncerTest
}
List<String> zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH);
Assert.assertEquals(20, zNodes.size());
Assert.assertEquals(25, zNodes.size());
Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {

View File

@ -249,7 +249,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector()
new TestDruidLeaderSelector(),
null
);
}
@ -546,7 +547,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector()
new TestDruidLeaderSelector(),
null
);
}

View File

@ -49,7 +49,6 @@ public class DataSourceCompactionConfigTest
"dataSource",
null,
500L,
100L,
null,
new Period(3600),
null,
@ -61,7 +60,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@ -75,7 +73,6 @@ public class DataSourceCompactionConfigTest
"dataSource",
null,
500L,
null,
30,
new Period(3600),
null,
@ -87,7 +84,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@ -113,7 +109,6 @@ public class DataSourceCompactionConfigTest
null,
500L,
null,
null,
new Period(3600),
new UserCompactTuningConfig(
null,
@ -131,58 +126,12 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
}
@Test
public void testSerdeTargetCompactionSizeBytesWithMaxRowsPerSegment()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]"
);
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
10000L,
1000,
new Period(3600),
null,
ImmutableMap.of("key", "val")
);
}
@Test
public void testSerdeTargetCompactionSizeBytesWithMaxTotalRows()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]"
);
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
10000L,
null,
new Period(3600),
new UserCompactTuningConfig(
null,
null,
10000L,
null,
null,
null
),
ImmutableMap.of("key", "val")
);
}
@Test
public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
{
@ -190,7 +139,6 @@ public class DataSourceCompactionConfigTest
"dataSource",
null,
500L,
null,
10000,
new Period(3600),
new UserCompactTuningConfig(
@ -210,7 +158,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());

View File

@ -214,7 +214,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector()
new TestDruidLeaderSelector(),
null
);
}

View File

@ -28,6 +28,8 @@ import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
@ -35,6 +37,7 @@ import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -47,7 +50,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -66,7 +68,6 @@ public class DruidCoordinatorSegmentCompactorTest
@Override
public String compactSegments(
List<DataSegment> segments,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
ClientCompactQueryTuningConfig tuningConfig,
Map<String, Object> context
@ -97,6 +98,22 @@ public class DruidCoordinatorSegmentCompactorTest
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
new NumberedShardSpec(i, 0),
new CompactionState(
new DynamicPartitionsSpec(
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "concise"),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
),
1,
segmentSize
);
@ -178,7 +195,10 @@ public class DruidCoordinatorSegmentCompactorTest
@Test
public void testRun()
{
final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient);
final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(
new DefaultObjectMapper(),
indexingServiceClient
);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@ -375,7 +395,6 @@ public class DruidCoordinatorSegmentCompactorTest
dataSource,
0,
50L,
20L,
null,
new Period("PT1H"), // smaller than segment interval
null,

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.helper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Comparators;
@ -49,7 +50,7 @@ public class NewestSegmentFirstPolicyTest
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4;
private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy();
private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
@Test
public void testLargeOffsetAndSmallSegmentInterval()
@ -280,73 +281,60 @@ public class NewestSegmentFirstPolicyTest
);
}
@Test
public void testIgnoreSingleSegmentToCompact()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
new Period("P1D"),
200,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
new Period("P1D"),
200,
1
)
)
),
Collections.emptyMap()
);
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testClearSegmentsToCompactWhenSkippingSegments()
{
final long maxSizeOfSegmentsToCompact = 800000;
final long inputSegmentSizeBytes = 800000;
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"),
new Period("P1D"),
maxSizeOfSegmentsToCompact / 2 + 10,
inputSegmentSizeBytes / 2 + 10,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
new Period("P1D"),
maxSizeOfSegmentsToCompact + 10, // large segment
inputSegmentSizeBytes + 10, // large segment
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
new Period("P1D"),
maxSizeOfSegmentsToCompact / 3 + 10,
inputSegmentSizeBytes / 3 + 10,
2
)
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, new Period("P0D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
final List<DataSegment> expectedSegmentsToCompact = timeline
.lookup(Intervals.of("2017-12-01/2017-12-02"))
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>();
expectedSegmentsToCompact.addAll(
timeline
.lookup(Intervals.of("2017-12-03/2017-12-04"))
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.map(PartitionChunk::getObject)
.collect(Collectors.toList())
);
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(expectedSegmentsToCompact, iterator.next());
expectedSegmentsToCompact.clear();
expectedSegmentsToCompact.addAll(
timeline
.lookup(Intervals.of("2017-12-01/2017-12-02"))
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.map(PartitionChunk::getObject)
.collect(Collectors.toList())
);
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(expectedSegmentsToCompact, iterator.next());
Assert.assertFalse(iterator.hasNext());
}
@ -569,15 +557,14 @@ public class NewestSegmentFirstPolicyTest
}
private DataSourceCompactionConfig createCompactionConfig(
long targetCompactionSizeBytes,
long inputSegmentSizeBytes,
Period skipOffsetFromLatest
)
{
return new DataSourceCompactionConfig(
DATA_SOURCE,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
inputSegmentSizeBytes,
null,
skipOffsetFromLatest,
null,

View File

@ -73,7 +73,7 @@ public class ServersResourceTest
+ "\"priority\":0,"
+ "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
+ "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\","
+ "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+ "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"lastCompactionState\":null,\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+ "\"currSize\":1}]";
Assert.assertEquals(expected, result);
}
@ -99,7 +99,7 @@ public class ServersResourceTest
+ "\"priority\":0,"
+ "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
+ "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\","
+ "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+ "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"lastCompactionState\":null,\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+ "\"currSize\":1}";
Assert.assertEquals(expected, result);
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.apache.druid.timeline.PruneLoadSpec;
import org.eclipse.jetty.server.Server;
@ -88,6 +89,7 @@ public class CliBroker extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282);
binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);
binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
LifecycleModule.register(binder, BrokerServerView.class);

View File

@ -146,8 +146,7 @@ public class CliCoordinator extends ServerRunnable
ConfigProvider.bind(binder, DruidCoordinatorConfig.class);
binder.bind(MetadataStorage.class)
.toProvider(MetadataStorageProvider.class);
binder.bind(MetadataStorage.class).toProvider(MetadataStorageProvider.class);
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);

View File

@ -49,6 +49,7 @@ import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
import java.util.List;
@ -79,6 +80,7 @@ public class CliHistorical extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/historical");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8083);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8283);
binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
// register Server before binding ZkCoordinator to ensure HTTP endpoints are available immediately
LifecycleModule.register(binder, Server.class);

View File

@ -64,6 +64,7 @@ import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
import java.util.List;
@ -96,6 +97,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/middlemanager");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8091);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8291);
binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);

View File

@ -43,6 +43,7 @@ import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.server.DruidNode;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import javax.annotation.Nullable;
import javax.xml.bind.DatatypeConverter;
@ -190,7 +191,7 @@ public class ExportMetadata extends GuiceRunnable
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(ObjectMapper.class, JSON_MAPPER);
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT);
JSON_MAPPER.setInjectableValues(injectableValues);
if (hadoopStorageDirectory != null && newLocalPath != null) {

View File

@ -53,6 +53,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -178,9 +179,10 @@ public class DruidSchemaTest extends CalciteTestBase
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
new NumberedShardSpec(2, 3),
null,
1,
100L,
DataSegment.PruneLoadSpecHolder.DEFAULT
PruneSpecsHolder.DEFAULT
);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment1);
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);

View File

@ -228,7 +228,7 @@ exports[`compaction dialog matches snapshot 1`] = `
<label
class="bp3-label"
>
Target compaction size bytes
Max rows per segment
<span
class="bp3-text-muted"
@ -277,7 +277,7 @@ exports[`compaction dialog matches snapshot 1`] = `
min="0"
style="padding-right: 10px;"
type="text"
value="419430400"
value="5000000"
/>
</div>
<div

View File

@ -40,7 +40,7 @@ export class CompactionDialog extends React.PureComponent<
CompactionDialogProps,
CompactionDialogState
> {
static DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 419430400;
static DEFAULT_MAX_ROWS_PER_SEGMENT = 5000000;
constructor(props: CompactionDialogProps) {
super(props);
@ -108,9 +108,9 @@ export class CompactionDialog extends React.PureComponent<
),
},
{
name: 'targetCompactionSizeBytes',
name: 'maxRowsPerSegment',
type: 'number',
defaultValue: CompactionDialog.DEFAULT_TARGET_COMPACTION_SIZE_BYTES,
defaultValue: CompactionDialog.DEFAULT_MAX_ROWS_PER_SEGMENT,
info: (
<p>
The target segment size, for each segment, after compaction. The actual sizes of

View File

@ -874,12 +874,12 @@ GROUP BY 1`;
const { compaction } = row.original;
let text: string;
if (compaction) {
if (compaction.targetCompactionSizeBytes == null) {
text = `Target: Default (${formatBytes(
CompactionDialog.DEFAULT_TARGET_COMPACTION_SIZE_BYTES,
if (compaction.maxRowsPerSegment == null) {
text = `Target: Default (${formatNumber(
CompactionDialog.DEFAULT_MAX_ROWS_PER_SEGMENT,
)})`;
} else {
text = `Target: ${formatBytes(compaction.targetCompactionSizeBytes)}`;
text = `Target: ${formatNumber(compaction.maxRowsPerSegment)}`;
}
} else {
text = 'None';