Use hash of Segment IDs instead of a list of explicit segments in auto compaction (#8571)

* IOConfig for compaction task

* add javadoc, doc, unit test

* fix webconsole test

* add spelling

* address comments

* fix build and test

* address comments
This commit is contained in:
Jihoon Son 2019-10-09 11:12:00 -07:00 committed by Gian Merlino
parent 526f04c47c
commit 96d8523ecb
34 changed files with 1354 additions and 603 deletions

View File

@ -99,7 +99,6 @@ public class NewestSegmentFirstPolicyBenchmark
null,
null,
null,
null,
null
)
);

View File

@ -17,20 +17,41 @@
* under the License.
*/
package org.apache.druid.indexer.partitions;
package org.apache.druid.indexer;
import org.apache.druid.java.util.common.IAE;
import java.util.List;
/**
* Various helper methods useful for checking the validity of arguments to spec constructors.
*/
class Checks
public final class Checks
{
public static <T> Property<T> checkOneNotNullOrEmpty(List<Property<T>> properties)
{
Property<T> nonNullProperty = null;
for (Property<T> property : properties) {
if (!property.isValueNullOrEmptyCollection()) {
if (nonNullProperty == null) {
nonNullProperty = property;
} else {
throw new IAE("At most one of %s must be present", properties);
}
}
}
if (nonNullProperty == null) {
throw new IAE("At most one of %s must be present", properties);
}
return nonNullProperty;
}
/**
* @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons.
*/
@SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if
static Property<Integer> checkAtMostOneNotNull(Property<Integer> property1, Property<Integer> property2)
public static <T> Property<T> checkAtMostOneNotNull(Property<T> property1, Property<T> property2)
{
final Property<Integer> property;
final Property<T> property;
boolean isNull1 = property1.getValue() == null;
boolean isNull2 = property2.getValue() == null;
@ -42,9 +63,7 @@ class Checks
} else if (isNull2) {
property = property1;
} else {
throw new IllegalArgumentException(
"At most one of " + property1.getName() + " or " + property2.getName() + " must be present"
);
throw new IAE("At most one of [%s] or [%s] must be present", property1, property2);
}
return property;
@ -53,10 +72,14 @@ class Checks
/**
* @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons.
*/
static Property<Integer> checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2)
public static <T> Property<T> checkAtMostOneNotNull(String name1, T value1, String name2, T value2)
{
Property<Integer> property1 = new Property<>(name1, value1);
Property<Integer> property2 = new Property<>(name2, value2);
Property<T> property1 = new Property<>(name1, value1);
Property<T> property2 = new Property<>(name2, value2);
return checkAtMostOneNotNull(property1, property2);
}
private Checks()
{
}
}

View File

@ -17,19 +17,20 @@
* under the License.
*/
package org.apache.druid.indexer.partitions;
package org.apache.druid.indexer;
import java.util.Collection;
import java.util.Objects;
/**
* Convenience class for holding a pair of string key and templated value.
*/
class Property<T>
public class Property<T>
{
private final String name;
private final T value;
Property(String name, T value)
public Property(String name, T value)
{
this.name = name;
this.value = value;
@ -45,6 +46,17 @@ class Property<T>
return value;
}
public boolean isValueNullOrEmptyCollection()
{
if (value == null) {
return true;
}
if (value instanceof Collection) {
return ((Collection) value).isEmpty();
}
return false;
}
@Override
public boolean equals(Object o)
{
@ -64,4 +76,13 @@ class Property<T>
{
return Objects.hash(name, value);
}
@Override
public String toString()
{
return "Property{" +
"name='" + name + '\'' +
", value=" + value +
'}';
}
}

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import javax.annotation.Nullable;
import java.util.Collections;

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

View File

@ -19,15 +19,23 @@
package org.apache.druid.segment;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
/**
* Utility methods useful for implementing deep storage extensions.
@ -35,6 +43,19 @@ import java.io.InputStream;
@PublicApi
public class SegmentUtils
{
private static final HashFunction HASH_FUNCTION = Hashing.sha256();
/**
* Hash the IDs of the given segments based on SHA-256 algorithm.
*/
public static String hashIds(List<DataSegment> segments)
{
Collections.sort(segments);
final Hasher hasher = HASH_FUNCTION.newHasher();
segments.forEach(segment -> hasher.putString(segment.getId().toString(), StandardCharsets.UTF_8));
return StringUtils.fromUtf8(hasher.hash().asBytes());
}
public static int getVersionFromDir(File inDir) throws IOException
{
File versionFile = new File(inDir, "version.bin");
@ -53,4 +74,8 @@ public class SegmentUtils
throw new IOE("Invalid segment dir [%s]. Can't find either of version.bin or index.drd.", inDir);
}
private SegmentUtils()
{
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.List;
public class ChecksTest
{
private static final String NAME1 = "name1";
private static final Integer VALUE1 = 1;
private static final String NAME2 = "name2";
private static final Integer VALUE2 = 2;
private static final Integer NULL = null;
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void checkAtMostOneNotNullFirstNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2);
Assert.assertEquals(NAME2, result.getName());
Assert.assertEquals(VALUE2, result.getValue());
}
@Test
public void checkAtMostOneNotNullSecondNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL);
Assert.assertEquals(NAME1, result.getName());
Assert.assertEquals(VALUE1, result.getValue());
}
@Test
public void checkAtMostOneNotNullBothNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL);
Assert.assertEquals(NAME1, result.getName());
Assert.assertEquals(NULL, result.getValue());
}
@Test
public void checkAtMostOneNotNullNeitherNull()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
StringUtils.format(
"[Property{name='%s', value=%s}] or [Property{name='%s', value=%s}]",
NAME1,
VALUE1,
NAME2,
VALUE2
)
);
Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2);
}
@Test
public void testCheckOneNotNullOrEmpty()
{
final List<Property<Object>> properties = ImmutableList.of(
new Property<>("p1", null),
new Property<>("p2", 2),
new Property<>("p3", null),
new Property<>("p4", Collections.emptyList())
);
final Property<Object> property = Checks.checkOneNotNullOrEmpty(properties);
Assert.assertEquals(new Property<>("p2", 2), property);
}
@Test
public void testCheckOneNotNullOrEmptyWithTwoNonNulls()
{
final List<Property<Object>> properties = ImmutableList.of(
new Property<>("p1", null),
new Property<>("p2", 2),
new Property<>("p3", 3),
new Property<>("p4", Collections.emptyList())
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
"At most one of [Property{name='p1', value=null}, Property{name='p2', value=2}, Property{name='p3', value=3}, "
+ "Property{name='p4', value=[]}] must be present"
);
Checks.checkOneNotNullOrEmpty(properties);
}
@Test
public void testCheckOneNotNullOrEmptyWithNonNullAndNonEmpty()
{
final List<Property<Object>> properties = ImmutableList.of(
new Property<>("p1", null),
new Property<>("p2", 2),
new Property<>("p3", null),
new Property<>("p4", Lists.newArrayList(1, 2))
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
"At most one of [Property{name='p1', value=null}, Property{name='p2', value=2}, Property{name='p3', value=null}, "
+ "Property{name='p4', value=[1, 2]}] must be present"
);
Checks.checkOneNotNullOrEmpty(properties);
}
@Test
public void testCheckOneNotNullOrEmptyWithAllNulls()
{
final List<Property<Object>> properties = ImmutableList.of(
new Property<>("p1", null),
new Property<>("p2", null),
new Property<>("p3", null),
new Property<>("p4", null)
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
"At most one of [Property{name='p1', value=null}, Property{name='p2', value=null}, "
+ "Property{name='p3', value=null}, Property{name='p4', value=null}] must be present"
);
Checks.checkOneNotNullOrEmpty(properties);
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer.partitions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class ChecksTest
{
private static final String NAME1 = "name1";
private static final Integer VALUE1 = 1;
private static final String NAME2 = "name2";
private static final Integer VALUE2 = 2;
private static final Integer NULL = null;
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void checkAtMostOneNotNullFirstNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2);
Assert.assertEquals(NAME2, result.getName());
Assert.assertEquals(VALUE2, result.getValue());
}
@Test
public void checkAtMostOneNotNullSecondNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL);
Assert.assertEquals(NAME1, result.getName());
Assert.assertEquals(VALUE1, result.getValue());
}
@Test
public void checkAtMostOneNotNullBothNull()
{
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL);
Assert.assertEquals(NAME1, result.getName());
Assert.assertEquals(NULL, result.getValue());
}
@Test
public void checkAtMostOneNotNullNeitherNull()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("At most one of " + NAME1 + " or " + NAME2 + " must be present");
//noinspection ConstantConditions (expected to fail)
Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2);
}
}

View File

@ -74,7 +74,7 @@ public class SingleDimensionPartitionsSpecTest
new Tester()
.targetRowsPerSegment(1)
.targetPartitionSize(1)
.testIllegalArgumentException("At most one of targetRowsPerSegment or targetPartitionSize must be present");
.testIllegalArgumentException("At most one of [Property{name='targetRowsPerSegment', value=1}] or [Property{name='targetPartitionSize', value=1}] must be present");
}
@Test
@ -83,7 +83,7 @@ public class SingleDimensionPartitionsSpecTest
new Tester()
.maxRowsPerSegment(1)
.maxPartitionSize(1)
.testIllegalArgumentException("At most one of maxRowsPerSegment or maxPartitionSize must be present");
.testIllegalArgumentException("At most one of [Property{name='maxRowsPerSegment', value=1}] or [Property{name='maxPartitionSize', value=1}] must be present");
}
@Test

View File

@ -788,7 +788,6 @@ A description of the compaction config is:
|`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 12GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)|
|`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)|
|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no|
|`maxNumSegmentsToCompact`|Maximum number of segments to compact together per compaction task. Since a time chunk must be processed in its entirety, if a time chunk has a total number of segments greater than this parameter, compaction will not run for that time chunk.|no (default = 150)|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#context) for compaction tasks.|no|

View File

@ -99,7 +99,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"ioConfig": <IO config>,
"dimensions" <custom dimensionsSpec>,
"segmentGranularity": <segment granularity after compaction>,
"targetCompactionSizeBytes": <target size of compacted segments>
@ -113,7 +113,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|DataSource name to be compacted|Yes|
|`interval`|Interval of segments to be compacted|Yes|
|`ioConfig`|ioConfig for compaction task. See [Compaction IOConfig](#compaction-ioconfig) for details.|Yes|
|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
|`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No|
@ -128,7 +128,13 @@ An example of compaction task is
{
"type" : "compact",
"dataSource" : "wikipedia",
"interval" : "2017-01-01/2018-01-01"
"ioConfig" : {
"type": "compact",
"inputSpec": {
"type": "interval",
"interval": "2017-01-01/2018-01-01"
}
}
}
```
@ -158,6 +164,31 @@ See [Roll-up](../ingestion/index.html#rollup) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).
### Compaction IOConfig
The compaction IOConfig requires specifying `inputSpec` as seen below.
|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`inputSpec`|Input specification|Yes|
There are two supported `inputSpec`s for now.
The interval `inputSpec` is:
|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `interval`|Yes|
|`interval`|Interval to compact|Yes|
The segments `inputSpec` is:
|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `segments`|Yes|
|`segments`|A list of segment IDs|Yes|
## Adding new data

View File

@ -24,11 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.List;
@ -38,9 +37,6 @@ public class HashedPartitionsSpecTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void havingTargetRowsPerSegmentOnly()
{
@ -130,7 +126,11 @@ public class HashedPartitionsSpecTest
Assert.fail(reasonPrefix + " did not throw exception");
}
catch (RuntimeException e) {
String expectedMessage = "At most one of " + first + " or " + second + " must be present";
final String expectedMessage = StringUtils.format(
"At most one of [Property{name='%s', value=100}] or [Property{name='%s', value=100}] must be present",
first,
second
);
Assert.assertThat(
reasonPrefix + " has wrong failure message",
e.getMessage(),

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.IOConfig;
import java.util.Objects;
/**
* {@link IOConfig} for {@link CompactionTask}.
*
* @see CompactionInputSpec
*/
@JsonTypeName("compact")
public class CompactionIOConfig implements IOConfig
{
private final CompactionInputSpec inputSpec;
@JsonCreator
public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec)
{
this.inputSpec = inputSpec;
}
@JsonProperty
public CompactionInputSpec getInputSpec()
{
return inputSpec;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionIOConfig that = (CompactionIOConfig) o;
return Objects.equals(inputSpec, that.inputSpec);
}
@Override
public int hashCode()
{
return Objects.hash(inputSpec);
}
@Override
public String toString()
{
return "CompactionIOConfig{" +
"inputSpec=" + inputSpec +
'}';
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
/**
* Input specification for compaction task.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = CompactionIntervalSpec.TYPE, value = CompactionIntervalSpec.class),
@Type(name = SpecificSegmentsSpec.TYPE, value = SpecificSegmentsSpec.class)
})
public interface CompactionInputSpec
{
/**
* Find the umbrella interval containing the specified input.
*/
Interval findInterval(String dataSource);
/**
* Validate the specified input against the most recent published segments.
* This method is used to check whether the specified input has gone stale.
*
* @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
*/
boolean validateSegments(List<DataSegment> latestSegments);
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Specifying an interval to compact. A hash of the segment IDs can be optionally provided for segment validation.
*/
public class CompactionIntervalSpec implements CompactionInputSpec
{
public static final String TYPE = "interval";
private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;
@JsonCreator
public CompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
)
{
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
}
this.interval = interval;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Nullable
@JsonProperty
public String getSha256OfSortedSegmentIds()
{
return sha256OfSortedSegmentIds;
}
@Override
public Interval findInterval(String dataSource)
{
return interval;
}
@Override
public boolean validateSegments(List<DataSegment> latestSegments)
{
final Interval segmentsInterval = JodaUtils.umbrellaInterval(
latestSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
if (interval.overlaps(segmentsInterval)) {
if (sha256OfSortedSegmentIds != null) {
final String hashOfThem = SegmentUtils.hashIds(latestSegments);
return hashOfThem.equals(sha256OfSortedSegmentIds);
} else {
return true;
}
} else {
return false;
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionIntervalSpec that = (CompactionIntervalSpec) o;
return Objects.equals(interval, that.interval) &&
Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds);
}
@Override
public int hashCode()
{
return Objects.hash(interval, sha256OfSortedSegmentIds);
}
@Override
public String toString()
{
return "CompactionIntervalSpec{" +
"interval=" + interval +
", sha256OfSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
'}';
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -40,6 +41,8 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@ -53,7 +56,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Numbers;
@ -121,8 +123,7 @@ public class CompactionTask extends AbstractBatchIndexTask
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
private final Interval interval;
private final List<DataSegment> segments;
private final CompactionIOConfig ioConfig;
@Nullable
private final DimensionsSpec dimensionsSpec;
@Nullable
@ -176,8 +177,9 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("id") final String id,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Nullable final Interval interval,
@JsonProperty("segments") @Nullable final List<DataSegment> segments,
@JsonProperty("interval") @Deprecated @Nullable final Interval interval,
@JsonProperty("segments") @Deprecated @Nullable final List<DataSegment> segments,
@JsonProperty("ioConfig") @Nullable CompactionIOConfig ioConfig,
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@ -196,22 +198,32 @@ public class CompactionTask extends AbstractBatchIndexTask
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified");
Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null");
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("ioConfig", ioConfig),
new Property<>("interval", interval),
new Property<>("segments", segments)
)
);
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null));
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
}
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
@ -223,15 +235,9 @@ public class CompactionTask extends AbstractBatchIndexTask
}
@JsonProperty
public Interval getInterval()
public CompactionIOConfig getIoConfig()
{
return interval;
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
return ioConfig;
}
@JsonProperty
@ -343,7 +349,7 @@ public class CompactionTask extends AbstractBatchIndexTask
}
if (indexTaskSpecs.isEmpty()) {
log.warn("Interval[%s] has no segments, nothing to do.", interval);
log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec());
return TaskStatus.failure(getId());
} else {
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
@ -758,36 +764,14 @@ public class CompactionTask extends AbstractBatchIndexTask
static class SegmentProvider
{
private final String dataSource;
private final CompactionInputSpec inputSpec;
private final Interval interval;
@Nullable
private final List<DataSegment> segments;
SegmentProvider(String dataSource, Interval interval)
SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
this.interval = Preconditions.checkNotNull(interval);
this.segments = null;
}
SegmentProvider(List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && !segments.isEmpty());
final String dataSource = segments.get(0).getDataSource();
Preconditions.checkArgument(
segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)),
"segments should have the same dataSource"
);
this.dataSource = dataSource;
this.segments = segments;
this.interval = JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
}
@Nullable
List<DataSegment> getSegments()
{
return segments;
this.inputSpec = inputSpec;
this.interval = inputSpec.findInterval(dataSource);
}
List<DataSegment> checkAndGetSegments(TaskActionClient actionClient) throws IOException
@ -804,24 +788,11 @@ public class CompactionTask extends AbstractBatchIndexTask
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
if (segments != null) {
Collections.sort(latestSegments);
Collections.sort(segments);
if (!latestSegments.equals(segments)) {
final List<DataSegment> unknownSegments = segments.stream()
.filter(segment -> !latestSegments.contains(segment))
.collect(Collectors.toList());
final List<DataSegment> missingSegments = latestSegments.stream()
.filter(segment -> !segments.contains(segment))
.collect(Collectors.toList());
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "There are unknown segments[%s] and missing segments[%s] in the spec.",
unknownSegments,
missingSegments
);
}
if (!inputSpec.validateSegments(latestSegments)) {
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "Possibly new segments would have been added or some segments have been unpublished."
);
}
return latestSegments;
}
@ -952,10 +923,7 @@ public class CompactionTask extends AbstractBatchIndexTask
private final RetryPolicyFactory retryPolicyFactory;
private final AppenderatorsManager appenderatorsManager;
@Nullable
private Interval interval;
@Nullable
private List<DataSegment> segments;
private CompactionIOConfig ioConfig;
@Nullable
private DimensionsSpec dimensionsSpec;
@Nullable
@ -994,13 +962,17 @@ public class CompactionTask extends AbstractBatchIndexTask
public Builder interval(Interval interval)
{
this.interval = interval;
return this;
return inputSpec(new CompactionIntervalSpec(interval, null));
}
public Builder segments(List<DataSegment> segments)
{
this.segments = segments;
return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
}
public Builder inputSpec(CompactionInputSpec inputSpec)
{
this.ioConfig = new CompactionIOConfig(inputSpec);
return this;
}
@ -1046,8 +1018,9 @@ public class CompactionTask extends AbstractBatchIndexTask
null,
null,
dataSource,
interval,
segments,
null,
null,
ioConfig,
null,
dimensionsSpec,
metricsSpec,

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class SpecificSegmentsSpec implements CompactionInputSpec
{
public static final String TYPE = "segments";
private final List<String> segments;
public static SpecificSegmentsSpec fromSegments(List<DataSegment> segments)
{
Preconditions.checkArgument(!segments.isEmpty(), "Empty segment list");
return new SpecificSegmentsSpec(
segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
);
}
@JsonCreator
public SpecificSegmentsSpec(@JsonProperty("segments") List<String> segments)
{
this.segments = segments;
// Sort segments to use in validateSegments.
Collections.sort(this.segments);
}
@JsonProperty
public List<String> getSegments()
{
return segments;
}
@Override
public Interval findInterval(String dataSource)
{
final List<SegmentId> segmentIds = segments
.stream()
.map(segment -> SegmentId.tryParse(dataSource, segment))
.collect(Collectors.toList());
return JodaUtils.umbrellaInterval(
segmentIds.stream().map(SegmentId::getInterval).collect(Collectors.toList())
);
}
@Override
public boolean validateSegments(List<DataSegment> latestSegments)
{
final List<String> thoseSegments = latestSegments
.stream()
.map(segment -> segment.getId().toString())
.sorted()
.collect(Collectors.toList());
return this.segments.equals(thoseSegments);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SpecificSegmentsSpec that = (SpecificSegmentsSpec) o;
return Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
return Objects.hash(segments);
}
@Override
public String toString()
{
return "SpecificSegmentsSpec{" +
"segments=" + segments +
'}';
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactQuery;
import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
public class ClientCompactQuerySerdeTest
{
private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils()
.getRowIngestionMetersFactory();
private static final CoordinatorClient COORDINATOR_CLIENT = new CoordinatorClient(null, null);
private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
final ClientCompactQuery query = new ClientCompactQuery(
"datasource",
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
)
),
null,
new ClientCompactQueryTuningConfig(
100,
40000,
2000L,
30000L,
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
1000L
),
new HashMap<>()
);
final byte[] json = mapper.writeValueAsBytes(query);
final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class);
Assert.assertEquals(query.getDataSource(), task.getDataSource());
Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec);
Assert.assertEquals(
query.getIoConfig().getInputSpec().getInterval(),
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getInterval()
);
Assert.assertEquals(
query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(),
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
);
Assert.assertEquals(query.getTargetCompactionSizeBytes(), task.getTargetCompactionSizeBytes());
Assert.assertEquals(
query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory()
);
Assert.assertEquals(
query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory()
);
Assert.assertEquals(
query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment()
);
Assert.assertEquals(
query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows()
);
Assert.assertEquals(
query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec()
);
Assert.assertEquals(
query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout()
);
Assert.assertEquals(query.getContext(), task.getContext());
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
{
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector,
objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector,
objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
GuiceInjectableValues injectableValues = new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
}
)
)
);
objectMapper.setInjectableValues(injectableValues);
return objectMapper;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class CompactionInputSpecTest
{
private static final String DATASOURCE = "datasource";
private static final List<DataSegment> SEGMENTS = prepareSegments();
private static Interval INTERVAL = JodaUtils.umbrellaInterval(
SEGMENTS.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
@Parameters
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{
new CompactionIntervalSpec(
INTERVAL,
SegmentUtils.hashIds(SEGMENTS)
)
},
new Object[]{
new SpecificSegmentsSpec(
SEGMENTS.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
)
}
);
}
private static List<DataSegment> prepareSegments()
{
return IntStream.range(0, 20)
.mapToObj(i -> newSegment(Intervals.of("2019-01-%02d/2019-01-%02d", i + 1, i + 2)))
.collect(Collectors.toList());
}
private static DataSegment newSegment(Interval interval)
{
return new DataSegment(
DATASOURCE,
interval,
"version",
null,
null,
null,
null,
9,
10
);
}
private final CompactionInputSpec inputSpec;
public CompactionInputSpecTest(CompactionInputSpec inputSpec)
{
this.inputSpec = inputSpec;
}
@Test
public void testFindInterval()
{
Assert.assertEquals(INTERVAL, inputSpec.findInterval(DATASOURCE));
}
@Test
public void testValidateSegments()
{
Assert.assertTrue(inputSpec.validateSegments(SEGMENTS));
}
@Test
public void testValidateWrongSegments()
{
final List<DataSegment> someSegmentIsMissing = new ArrayList<>(SEGMENTS);
someSegmentIsMissing.remove(0);
Assert.assertFalse(inputSpec.validateSegments(someSegmentIsMissing));
final List<DataSegment> someSegmentIsUnknown = new ArrayList<>(SEGMENTS);
someSegmentIsUnknown.add(newSegment(Intervals.of("2018-01-01/2018-01-02")));
Assert.assertFalse(inputSpec.validateSegments(someSegmentIsUnknown));
}
}

View File

@ -81,6 +81,7 @@ import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.SimpleQueryableIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.BitmapIndex;
@ -150,15 +151,17 @@ public class CompactionTaskTest
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils()
.getRowIngestionMetersFactory();
private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP);
private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private static Map<String, DimensionSchema> DIMENSIONS;
private static List<AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
private static Map<DataSegment, File> segmentMap = new HashMap<>();
private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap);
private static AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
private TaskToolbox toolbox;
private SegmentLoaderFactory segmentLoaderFactory;
@ -207,7 +210,7 @@ public class CompactionTaskTest
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put(
SEGMENT_MAP.put(
new DataSegment(
DATA_SOURCE,
segmentInterval,
@ -222,7 +225,7 @@ public class CompactionTaskTest
new File("file_" + i)
);
}
SEGMENTS = new ArrayList<>(segmentMap.keySet());
SEGMENTS = new ArrayList<>(SEGMENT_MAP.keySet());
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
@ -244,10 +247,10 @@ public class CompactionTaskTest
binder -> {
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
binder.bind(CoordinatorClient.class).toInstance(coordinatorClient);
binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(appenderatorsManager);
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
}
)
)
@ -312,13 +315,13 @@ public class CompactionTaskTest
@Before
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap);
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
segmentMap
SEGMENT_MAP
);
segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper);
segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
}
@Test
@ -326,23 +329,25 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
objectMapper,
OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory,
coordinatorClient,
ROW_INGESTION_METERS_FACTORY,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
RETRY_POLICY_FACTORY,
APPENDERATORS_MANAGER
);
final CompactionTask task = builder
.interval(COMPACTION_INTERVAL)
.inputSpec(
new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
)
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testContext"))
.build();
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@ -351,14 +356,14 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
objectMapper,
OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory,
coordinatorClient,
ROW_INGESTION_METERS_FACTORY,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
RETRY_POLICY_FACTORY,
APPENDERATORS_MANAGER
);
final CompactionTask task = builder
.segments(SEGMENTS)
@ -366,8 +371,8 @@ public class CompactionTaskTest
.context(ImmutableMap.of("testKey", "testContext"))
.build();
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@ -376,14 +381,14 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
objectMapper,
OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory,
coordinatorClient,
ROW_INGESTION_METERS_FACTORY,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
RETRY_POLICY_FACTORY,
APPENDERATORS_MANAGER
);
final CompactionTask task = builder
@ -401,8 +406,8 @@ public class CompactionTaskTest
.context(ImmutableMap.of("testKey", "testVal"))
.build();
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@ -410,10 +415,9 @@ public class CompactionTaskTest
{
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getDataSource(), actual.getDataSource());
Assert.assertEquals(expected.getInterval(), actual.getInterval());
Assert.assertEquals(expected.getSegments(), actual.getSegments());
Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig());
Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
Assert.assertTrue(Arrays.equals(expected.getMetricsSpec(), actual.getMetricsSpec()));
Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec());
Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes());
Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
Assert.assertEquals(expected.getContext(), actual.getContext());
@ -424,15 +428,15 @@ public class CompactionTaskTest
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -484,15 +488,15 @@ public class CompactionTaskTest
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -545,15 +549,15 @@ public class CompactionTaskTest
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -606,15 +610,15 @@ public class CompactionTaskTest
);
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -667,15 +671,15 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
customSpec,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
ingestionSpecs.sort(
@ -708,15 +712,15 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
customMetricsSpec,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -742,15 +746,15 @@ public class CompactionTaskTest
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(SEGMENTS),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -782,15 +786,15 @@ public class CompactionTaskTest
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
}
@ -805,15 +809,15 @@ public class CompactionTaskTest
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
}
@ -825,14 +829,14 @@ public class CompactionTaskTest
final Builder builder = new Builder(
DATA_SOURCE,
objectMapper,
OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory,
coordinatorClient,
ROW_INGESTION_METERS_FACTORY,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
RETRY_POLICY_FACTORY,
APPENDERATORS_MANAGER
);
final CompactionTask task = builder
@ -874,15 +878,15 @@ public class CompactionTaskTest
expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with");
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(6L, tuningConfig),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
}
@ -891,15 +895,15 @@ public class CompactionTaskTest
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
new PeriodGranularity(Period.months(3), null, null),
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@ -926,15 +930,15 @@ public class CompactionTaskTest
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper,
coordinatorClient,
OBJECT_MAPPER,
COORDINATOR_CLIENT,
segmentLoaderFactory,
retryPolicyFactory
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -962,7 +966,7 @@ public class CompactionTaskTest
final Map<File, QueryableIndex> queryableIndexMap = indexIO.getQueryableIndexMap();
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<DataSegment, File> entry : segmentMap.entrySet()) {
for (Entry<DataSegment, File> entry : SEGMENT_MAP.entrySet()) {
final DataSegment segment = entry.getKey();
final File file = entry.getValue();
segments.add(Pair.of(Preconditions.checkNotNull(queryableIndexMap.get(file)), segment));
@ -1087,7 +1091,7 @@ public class CompactionTaskTest
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
final InputRowParser parser = OBJECT_MAPPER.convertValue(dataSchema.getParser(), InputRowParser.class);
Assert.assertTrue(parser instanceof TransformingInputRowParser);
Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
@ -1181,7 +1185,7 @@ public class CompactionTaskTest
null,
null,
null,
new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
null,
null,
null,

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SpecificSegmentsSpecTest
{
@Test
public void createTest()
{
final List<DataSegment> segments = IntStream
.range(0, 20)
.mapToObj(i -> newSegment(Intervals.of("2019-01-%02d/2019-01-%02d", i + 1, i + 2)))
.collect(Collectors.toList());
final List<String> expectedSegmentIds = segments
.stream()
.map(segment -> segment.getId().toString())
.collect(Collectors.toList());
Collections.shuffle(segments, ThreadLocalRandom.current());
final SpecificSegmentsSpec spec = SpecificSegmentsSpec.fromSegments(segments);
Assert.assertEquals(expectedSegmentIds, spec.getSegments());
}
private static DataSegment newSegment(Interval interval)
{
return new DataSegment(
"datasource",
interval,
"version",
null,
null,
null,
null,
9,
10
);
}
}

View File

@ -21,11 +21,8 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -36,8 +33,7 @@ import java.util.Objects;
public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final List<DataSegment> segments;
private final Interval interval;
private final ClientCompactionIOConfig ioConfig;
@Nullable
private final Long targetCompactionSizeBytes;
private final ClientCompactQueryTuningConfig tuningConfig;
@ -46,16 +42,14 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") @Nullable final Interval interval,
@JsonProperty("segments") @Nullable final List<DataSegment> segments,
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.dataSource = dataSource;
this.segments = segments;
this.interval = interval;
this.ioConfig = ioConfig;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.context = context;
@ -76,15 +70,9 @@ public class ClientCompactQuery implements ClientQuery
}
@JsonProperty
public List<DataSegment> getSegments()
public ClientCompactionIOConfig getIoConfig()
{
return segments;
}
@JsonProperty
public Interval getInterval()
{
return interval;
return ioConfig;
}
@JsonProperty
@ -117,8 +105,7 @@ public class ClientCompactQuery implements ClientQuery
}
ClientCompactQuery that = (ClientCompactQuery) o;
return Objects.equals(dataSource, that.dataSource) &&
Objects.equals(segments, that.segments) &&
Objects.equals(interval, that.interval) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
@ -127,23 +114,15 @@ public class ClientCompactQuery implements ClientQuery
@Override
public int hashCode()
{
return Objects.hash(
dataSource,
segments,
interval,
targetCompactionSizeBytes,
tuningConfig,
context
);
return Objects.hash(dataSource, ioConfig, targetCompactionSizeBytes, tuningConfig, context);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", interval=" + interval +
", ioConfig=" + ioConfig +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", context=" + context +

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* IOConfig for {@link ClientCompactQuery}.
*
* Should be synchronized with org.apache.druid.indexing.common.task.CompactionIOConfig.
*/
public class ClientCompactionIOConfig
{
private static final String TYPE = "compact";
private final ClientCompactionIntervalSpec inputSpec;
@JsonCreator
public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec)
{
this.inputSpec = inputSpec;
}
@JsonProperty
public String getType()
{
return TYPE;
}
@JsonProperty
public ClientCompactionIntervalSpec getInputSpec()
{
return inputSpec;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
return Objects.equals(inputSpec, that.inputSpec);
}
@Override
public int hashCode()
{
return Objects.hash(inputSpec);
}
@Override
public String toString()
{
return "ClientCompactionIOConfig{" +
"inputSpec=" + inputSpec +
'}';
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* InputSpec for {@link ClientCompactionIOConfig}.
*
* Should be synchronized with org.apache.druid.indexing.common.task.CompactionIntervalSpec.
*/
public class ClientCompactionIntervalSpec
{
private static final String TYPE = "interval";
private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;
public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segments)
{
return new ClientCompactionIntervalSpec(
JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
),
SegmentUtils.hashIds(segments)
);
}
@JsonCreator
public ClientCompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
)
{
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
}
this.interval = interval;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
}
@JsonProperty
public String getType()
{
return TYPE;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Nullable
@JsonProperty
public String getSha256OfSortedSegmentIds()
{
return sha256OfSortedSegmentIds;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientCompactionIntervalSpec that = (ClientCompactionIntervalSpec) o;
return Objects.equals(interval, that.interval) &&
Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds);
}
@Override
public int hashCode()
{
return Objects.hash(interval, sha256OfSortedSegmentIds);
}
@Override
public String toString()
{
return "ClientCompactionIntervalSpec{" +
"interval=" + interval +
", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
'}';
}
}

View File

@ -93,8 +93,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
return runTask(
new ClientCompactQuery(
dataSource,
null,
segments,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
targetCompactionSizeBytes,
tuningConfig,
context

View File

@ -38,7 +38,6 @@ public class DataSourceCompactionConfig
// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024;
private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");
private final String dataSource;
@ -50,7 +49,6 @@ public class DataSourceCompactionConfig
// RemoteTaskRunnerConfig.maxZnodeBytes.
@Nullable
private final Integer maxRowsPerSegment;
private final int maxNumSegmentsToCompact;
private final Period skipOffsetFromLatest;
private final UserCompactTuningConfig tuningConfig;
private final Map<String, Object> taskContext;
@ -62,7 +60,6 @@ public class DataSourceCompactionConfig
@JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
@ -81,17 +78,9 @@ public class DataSourceCompactionConfig
tuningConfig
);
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
? DEFAULT_NUM_INPUT_SEGMENTS
: maxNumSegmentsToCompact;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.taskContext = taskContext;
Preconditions.checkArgument(
this.maxNumSegmentsToCompact > 1,
"numTargetCompactionSegments should be larger than 1"
);
}
/**
@ -157,12 +146,6 @@ public class DataSourceCompactionConfig
return inputSegmentSizeBytes;
}
@JsonProperty
public int getMaxNumSegmentsToCompact()
{
return maxNumSegmentsToCompact;
}
@JsonProperty
@Nullable
public Long getTargetCompactionSizeBytes()
@ -209,7 +192,6 @@ public class DataSourceCompactionConfig
DataSourceCompactionConfig that = (DataSourceCompactionConfig) o;
return taskPriority == that.taskPriority &&
inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
maxNumSegmentsToCompact == that.maxNumSegmentsToCompact &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
@ -225,7 +207,6 @@ public class DataSourceCompactionConfig
taskPriority,
inputSegmentSizeBytes,
targetCompactionSizeBytes,
maxNumSegmentsToCompact,
skipOffsetFromLatest,
tuningConfig,
taskContext

View File

@ -28,8 +28,6 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
@ -39,7 +37,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -94,22 +91,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval;
if (compactQuery.getSegments() != null) {
interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
} else if (compactQuery.getInterval() != null) {
interval = compactQuery.getInterval();
} else {
throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
}
final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval();
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
@ -218,7 +200,6 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
return stats;
}
@Nullable
public long getRemainingSegmentSizeBytes(String dataSource)
{
return remainingSegmentSizeBytes.getLong(dataSource);

View File

@ -247,19 +247,17 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()) {
final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact;
final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
targetCompactionSizeBytes,
candidates.segments
);
if (isCompactibleSize && isCompactibleNum && needsCompaction) {
if (isCompactibleSize && needsCompaction) {
return candidates;
} else {
if (!isCompactibleSize) {
@ -272,18 +270,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
inputSegmentSize
);
}
if (!isCompactibleNum) {
log.warn(
"Number of segments[%d] for datasource[%s] and interval[%s] is larger than "
+ "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
+ "segments, consider increasing 'numTargetCompactionSegments' and "
+ "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.",
candidates.getNumSegments(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
maxNumSegmentsToCompact
);
}
if (!needsCompaction) {
log.warn(
"Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "

View File

@ -51,7 +51,6 @@ public class DataSourceCompactionConfigTest
500L,
100L,
null,
20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@ -64,7 +63,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@ -79,7 +77,6 @@ public class DataSourceCompactionConfigTest
500L,
null,
30,
20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@ -92,7 +89,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@ -118,7 +114,6 @@ public class DataSourceCompactionConfigTest
500L,
null,
null,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
@ -138,7 +133,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@ -157,7 +151,6 @@ public class DataSourceCompactionConfigTest
500L,
10000L,
1000,
20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@ -177,7 +170,6 @@ public class DataSourceCompactionConfigTest
500L,
10000L,
null,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
@ -200,7 +192,6 @@ public class DataSourceCompactionConfigTest
500L,
null,
10000,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
@ -221,7 +212,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());

View File

@ -377,7 +377,6 @@ public class DruidCoordinatorSegmentCompactorTest
50L,
20L,
null,
null,
new Period("PT1H"), // smaller than segment interval
null,
null

View File

@ -56,7 +56,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P2D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -81,7 +81,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1M"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -114,41 +114,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H1M"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
// larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
)
),
Collections.emptyMap()
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"),
Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"),
false
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
Intervals.of("2017-11-15T06:00:00/2017-11-15T07:00:00"),
true
);
}
@Test
public void testSmallNumTargetCompactionSegments()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 5, new Period("PT1H1M"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -181,7 +147,7 @@ public class NewestSegmentFirstPolicyTest
public void testHugeShard()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -231,7 +197,7 @@ public class NewestSegmentFirstPolicyTest
public void testManySegmentsPerShard()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -283,70 +249,6 @@ public class NewestSegmentFirstPolicyTest
Assert.assertEquals(Intervals.of("2017-12-03T11:00:00/2017-12-03T12:00:00"), lastInterval);
}
@Test
public void testManySegmentsPerShard2()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-04T11:00:00/2017-12-05T05:00:00"),
new Period("PT1H"),
200,
150
),
new SegmentGenerateSpec(
Intervals.of("2017-12-04T06:00:00/2017-12-04T11:00:00"),
new Period("PT1H"),
375,
80
),
new SegmentGenerateSpec(
Intervals.of("2017-12-03T18:00:00/2017-12-04T06:00:00"),
new Period("PT12H"),
257000,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-03T11:00:00/2017-12-03T18:00:00"),
new Period("PT1H"),
200,
150
),
new SegmentGenerateSpec(
Intervals.of("2017-12-02T19:00:00/2017-12-03T11:00:00"),
new Period("PT16H"),
257000,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-02T11:00:00/2017-12-02T19:00:00"),
new Period("PT1H"),
200,
150
),
new SegmentGenerateSpec(
Intervals.of("2017-12-01T18:00:00/2017-12-02T11:00:00"),
new Period("PT17H"),
257000,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-01T09:00:00/2017-12-01T18:00:00"),
new Period("PT1H"),
200,
150
)
)
),
Collections.emptyMap()
);
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testSkipUnknownDataSource()
{
@ -355,9 +257,9 @@ public class NewestSegmentFirstPolicyTest
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(
unknownDataSource,
createCompactionConfig(10000, 100, new Period("P2D")),
createCompactionConfig(10000, new Period("P2D")),
DATA_SOURCE,
createCompactionConfig(10000, 100, new Period("P2D"))
createCompactionConfig(10000, new Period("P2D"))
),
ImmutableMap.of(
DATA_SOURCE,
@ -382,7 +284,7 @@ public class NewestSegmentFirstPolicyTest
public void testIgnoreSingleSegmentToCompact()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -431,7 +333,7 @@ public class NewestSegmentFirstPolicyTest
)
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, new Period("P0D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -461,7 +363,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -482,7 +384,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -495,7 +397,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -535,7 +437,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H"))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@ -668,7 +570,6 @@ public class NewestSegmentFirstPolicyTest
private DataSourceCompactionConfig createCompactionConfig(
long targetCompactionSizeBytes,
int numTargetCompactionSegments,
Period skipOffsetFromLatest
)
{
@ -678,7 +579,6 @@ public class NewestSegmentFirstPolicyTest
targetCompactionSizeBytes,
targetCompactionSizeBytes,
null,
numTargetCompactionSegments,
skipOffsetFromLatest,
null,
null

View File

@ -167,119 +167,6 @@ exports[`compaction dialog matches snapshot 1`] = `
</div>
</div>
</div>
<div
class="bp3-form-group"
>
<label
class="bp3-label"
>
Max num segments to compact
<span
class="bp3-text-muted"
>
<span
class="bp3-popover-wrapper"
>
<span
class="bp3-popover-target"
>
<span
class="bp3-icon bp3-icon-info-sign"
icon="info-sign"
>
<svg
data-icon="info-sign"
height="14"
viewBox="0 0 16 16"
width="14"
>
<desc>
info-sign
</desc>
<path
d="M8 0C3.58 0 0 3.58 0 8s3.58 8 8 8 8-3.58 8-8-3.58-8-8-8zM7 3h2v2H7V3zm3 10H6v-1h1V7H6V6h3v6h1v1z"
fill-rule="evenodd"
/>
</svg>
</span>
</span>
</span>
</span>
</label>
<div
class="bp3-form-content"
>
<div
class="bp3-control-group bp3-fill bp3-numeric-input"
>
<div
class="bp3-input-group"
>
<input
autocomplete="off"
class="bp3-input"
min="0"
style="padding-right: 10px;"
type="text"
value="150"
/>
</div>
<div
class="bp3-button-group bp3-vertical bp3-fixed"
>
<button
class="bp3-button"
type="button"
>
<span
class="bp3-icon bp3-icon-chevron-up"
icon="chevron-up"
>
<svg
data-icon="chevron-up"
height="16"
viewBox="0 0 16 16"
width="16"
>
<desc>
chevron-up
</desc>
<path
d="M12.71 9.29l-4-4C8.53 5.11 8.28 5 8 5s-.53.11-.71.29l-4 4a1.003 1.003 0 001.42 1.42L8 7.41l3.29 3.29c.18.19.43.3.71.3a1.003 1.003 0 00.71-1.71z"
fill-rule="evenodd"
/>
</svg>
</span>
</button>
<button
class="bp3-button"
type="button"
>
<span
class="bp3-icon bp3-icon-chevron-down"
icon="chevron-down"
>
<svg
data-icon="chevron-down"
height="16"
viewBox="0 0 16 16"
width="16"
>
<desc>
chevron-down
</desc>
<path
d="M12 5c-.28 0-.53.11-.71.29L8 8.59l-3.29-3.3a1.003 1.003 0 00-1.42 1.42l4 4c.18.18.43.29.71.29s.53-.11.71-.29l4-4A1.003 1.003 0 0012 5z"
fill-rule="evenodd"
/>
</svg>
</span>
</button>
</div>
</div>
</div>
</div>
<div
class="bp3-form-group"
>

View File

@ -96,18 +96,6 @@ export class CompactionDialog extends React.PureComponent<
</p>
),
},
{
name: 'maxNumSegmentsToCompact',
type: 'number',
defaultValue: 150,
info: (
<p>
Maximum number of segments to compact together per compaction task. Since a time
chunk must be processed in its entirety, if a time chunk has a total number of
segments greater than this parameter, compaction will not run for that time chunk.
</p>
),
},
{
name: 'skipOffsetFromLatest',
type: 'string',

View File

@ -823,6 +823,7 @@ listDelimiter
timestampSpec
- ../docs/ingestion/data-management.md
1GB
IOConfig
compactionTask
compactionTasks
ingestSegmentFirehose