mirror of https://github.com/apache/druid.git
Support segmentGranularity for auto-compaction (#10843)
* Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * resolve conflict * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * fix tests * fix more tests * fix checkstyle * add unit tests * fix checkstyle * fix checkstyle * fix checkstyle * add unit tests * add integration tests * fix checkstyle * fix checkstyle * fix failing tests * address comments * address comments * fix tests * fix tests * fix test * fix test * fix test * fix test * fix test * fix test * fix test * fix test
This commit is contained in:
parent
e684b83e29
commit
6541178c21
|
@ -99,6 +99,7 @@ public class NewestSegmentFirstPolicyBenchmark
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
|
|
|
@ -43,15 +43,20 @@ public class CompactionState
|
|||
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
|
||||
// has a dependency on the 'core' module where this class is.
|
||||
private final Map<String, Object> indexSpec;
|
||||
// org.apache.druid.segment.indexing.granularity.GranularitySpec cannot be used here because it's in the
|
||||
// 'server' module which has a dependency on the 'core' module where this class is.
|
||||
private final Map<String, Object> granularitySpec;
|
||||
|
||||
@JsonCreator
|
||||
public CompactionState(
|
||||
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||
@JsonProperty("indexSpec") Map<String, Object> indexSpec
|
||||
@JsonProperty("indexSpec") Map<String, Object> indexSpec,
|
||||
@JsonProperty("granularitySpec") Map<String, Object> granularitySpec
|
||||
)
|
||||
{
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
this.indexSpec = indexSpec;
|
||||
this.granularitySpec = granularitySpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -66,6 +71,12 @@ public class CompactionState
|
|||
return indexSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Object> getGranularitySpec()
|
||||
{
|
||||
return granularitySpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -77,13 +88,14 @@ public class CompactionState
|
|||
}
|
||||
CompactionState that = (CompactionState) o;
|
||||
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
|
||||
Objects.equals(indexSpec, that.indexSpec);
|
||||
Objects.equals(indexSpec, that.indexSpec) &&
|
||||
Objects.equals(granularitySpec, that.granularitySpec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(partitionsSpec, indexSpec);
|
||||
return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,6 +104,7 @@ public class CompactionState
|
|||
return "CompactionState{" +
|
||||
"partitionsSpec=" + partitionsSpec +
|
||||
", indexSpec=" + indexSpec +
|
||||
", granularitySpec=" + granularitySpec +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public interface PartitionChunk<T> extends Comparable<PartitionChunk<T>>
|
|||
* Returns true if this chunk is the end of the partition. Most commonly, that means it represents the range
|
||||
* [X, infinity] for some concrete X.
|
||||
*
|
||||
* @return true if the chunk is the beginning of the partition
|
||||
* @return true if the chunk is the end of the partition
|
||||
*/
|
||||
boolean isEnd();
|
||||
|
||||
|
|
|
@ -120,6 +120,7 @@ public class DataSegmentTest
|
|||
new NumberedShardSpec(3, 0),
|
||||
new CompactionState(
|
||||
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
|
||||
ImmutableMap.of(),
|
||||
ImmutableMap.of()
|
||||
),
|
||||
TEST_VERSION,
|
||||
|
@ -231,7 +232,8 @@ public class DataSegmentTest
|
|||
{
|
||||
final CompactionState compactionState = new CompactionState(
|
||||
new DynamicPartitionsSpec(null, null),
|
||||
Collections.singletonMap("test", "map")
|
||||
Collections.singletonMap("test", "map"),
|
||||
Collections.singletonMap("test2", "map2")
|
||||
);
|
||||
final DataSegment segment1 = DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
|
|
|
@ -473,12 +473,14 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
|||
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
|
||||
boolean storeCompactionState,
|
||||
TaskToolbox toolbox,
|
||||
IndexTuningConfig tuningConfig
|
||||
IndexTuningConfig tuningConfig,
|
||||
GranularitySpec granularitySpec
|
||||
)
|
||||
{
|
||||
if (storeCompactionState) {
|
||||
final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
|
||||
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap);
|
||||
final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
|
||||
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
|
||||
return segments -> segments
|
||||
.stream()
|
||||
.map(s -> s.withLastCompactionState(compactionState))
|
||||
|
|
|
@ -141,6 +141,8 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
@Nullable
|
||||
private final Granularity segmentGranularity;
|
||||
@Nullable
|
||||
private final GranularitySpec granularitySpec;
|
||||
@Nullable
|
||||
private final ParallelIndexTuningConfig tuningConfig;
|
||||
@JsonIgnore
|
||||
private final SegmentProvider segmentProvider;
|
||||
|
@ -172,7 +174,8 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
|
||||
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
|
||||
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
|
||||
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
|
||||
@JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity,
|
||||
@JsonProperty("granularitySpec") @Nullable final GranularitySpec granularitySpec,
|
||||
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
|
||||
@JsonProperty("context") @Nullable final Map<String, Object> context,
|
||||
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
|
||||
|
@ -202,6 +205,16 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
|
||||
this.metricsSpec = metricsSpec;
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
if (granularitySpec == null && segmentGranularity != null) {
|
||||
this.granularitySpec = new UniformGranularitySpec(
|
||||
segmentGranularity,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
} else {
|
||||
this.granularitySpec = granularitySpec;
|
||||
}
|
||||
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
|
||||
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
|
||||
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
|
||||
|
@ -288,7 +301,14 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
@Override
|
||||
public Granularity getSegmentGranularity()
|
||||
{
|
||||
return segmentGranularity;
|
||||
return granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public GranularitySpec getGranularitySpec()
|
||||
{
|
||||
return granularitySpec;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -348,7 +368,7 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
partitionConfigurationManager,
|
||||
dimensionsSpec,
|
||||
metricsSpec,
|
||||
segmentGranularity,
|
||||
getSegmentGranularity(),
|
||||
toolbox.getCoordinatorClient(),
|
||||
segmentLoaderFactory,
|
||||
retryPolicyFactory
|
||||
|
@ -892,6 +912,8 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
@Nullable
|
||||
private Granularity segmentGranularity;
|
||||
@Nullable
|
||||
private GranularitySpec granularitySpec;
|
||||
@Nullable
|
||||
private TuningConfig tuningConfig;
|
||||
@Nullable
|
||||
private Map<String, Object> context;
|
||||
|
@ -941,6 +963,12 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder granularitySpec(GranularitySpec granularitySpec)
|
||||
{
|
||||
this.granularitySpec = granularitySpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder tuningConfig(TuningConfig tuningConfig)
|
||||
{
|
||||
this.tuningConfig = tuningConfig;
|
||||
|
@ -966,6 +994,7 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
dimensionsSpec,
|
||||
metricsSpec,
|
||||
segmentGranularity,
|
||||
granularitySpec,
|
||||
tuningConfig,
|
||||
context,
|
||||
segmentLoaderFactory,
|
||||
|
|
|
@ -904,7 +904,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
compactionStateAnnotateFunction(
|
||||
storeCompactionState,
|
||||
toolbox,
|
||||
ingestionSchema.getTuningConfig()
|
||||
ingestionSchema.getTuningConfig(),
|
||||
ingestionSchema.getDataSchema().getGranularitySpec()
|
||||
);
|
||||
|
||||
// Probably we can publish atomicUpdateGroup along with segments.
|
||||
|
|
|
@ -929,7 +929,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
|
||||
storeCompactionState,
|
||||
toolbox,
|
||||
ingestionSchema.getTuningConfig()
|
||||
ingestionSchema.getTuningConfig(),
|
||||
ingestionSchema.getDataSchema().getGranularitySpec()
|
||||
);
|
||||
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
|
||||
toolbox.getTaskActionClient().submit(
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
|
|||
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
|
||||
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||
import org.apache.druid.client.indexing.ClientTaskQuery;
|
||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||
|
@ -45,11 +46,13 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
|
|||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
|
||||
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
|
||||
import org.apache.druid.segment.data.CompressionStrategy;
|
||||
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
|
||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
|
||||
|
@ -113,6 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
|
|||
1000,
|
||||
100
|
||||
),
|
||||
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
|
||||
ImmutableMap.of("key", "value")
|
||||
);
|
||||
|
||||
|
@ -186,6 +190,18 @@ public class ClientCompactionTaskQuerySerdeTest
|
|||
query.getTuningConfig().getTotalNumMergeTasks().intValue(),
|
||||
task.getTuningConfig().getTotalNumMergeTasks()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
query.getGranularitySpec().getQueryGranularity(),
|
||||
task.getGranularitySpec().getQueryGranularity()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
query.getGranularitySpec().getSegmentGranularity(),
|
||||
task.getGranularitySpec().getSegmentGranularity()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
query.getGranularitySpec().isRollup(),
|
||||
task.getGranularitySpec().isRollup()
|
||||
);
|
||||
Assert.assertEquals(query.getContext(), task.getContext());
|
||||
}
|
||||
|
||||
|
@ -243,6 +259,7 @@ public class ClientCompactionTaskQuerySerdeTest
|
|||
null
|
||||
)
|
||||
)
|
||||
.granularitySpec(new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, null))
|
||||
.build();
|
||||
|
||||
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
|
||||
|
@ -284,6 +301,7 @@ public class ClientCompactionTaskQuerySerdeTest
|
|||
1000,
|
||||
100
|
||||
),
|
||||
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
|
||||
new HashMap<>()
|
||||
);
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRunParallelWithDynamicPartitioningMatchCompactionState()
|
||||
public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws Exception
|
||||
{
|
||||
runIndexTask(null, true);
|
||||
|
||||
|
@ -144,22 +144,33 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
|
|||
.build();
|
||||
|
||||
final Set<DataSegment> compactedSegments = runTask(compactionTask);
|
||||
final CompactionState expectedState = new CompactionState(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
|
||||
);
|
||||
for (DataSegment segment : compactedSegments) {
|
||||
Assert.assertSame(
|
||||
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
|
||||
segment.getShardSpec().getClass()
|
||||
);
|
||||
// Expect compaction state to exist as store compaction state by default
|
||||
CompactionState expectedState = new CompactionState(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
|
||||
getObjectMapper().readValue(
|
||||
getObjectMapper().writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.NONE,
|
||||
true,
|
||||
ImmutableList.of(segment.getInterval())
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedState, segment.getLastCompactionState());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunParallelWithHashPartitioningMatchCompactionState()
|
||||
public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exception
|
||||
{
|
||||
// Hash partitioning is not supported with segment lock yet
|
||||
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
|
||||
|
@ -176,19 +187,30 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
|
|||
.build();
|
||||
|
||||
final Set<DataSegment> compactedSegments = runTask(compactionTask);
|
||||
final CompactionState expectedState = new CompactionState(
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
|
||||
);
|
||||
for (DataSegment segment : compactedSegments) {
|
||||
// Expect compaction state to exist as store compaction state by default
|
||||
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
|
||||
CompactionState expectedState = new CompactionState(
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
|
||||
getObjectMapper().readValue(
|
||||
getObjectMapper().writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.NONE,
|
||||
true,
|
||||
ImmutableList.of(segment.getInterval())
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedState, segment.getLastCompactionState());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunParallelWithRangePartitioning()
|
||||
public void testRunParallelWithRangePartitioning() throws Exception
|
||||
{
|
||||
// Range partitioning is not supported with segment lock yet
|
||||
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
|
||||
|
@ -205,19 +227,30 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
|
|||
.build();
|
||||
|
||||
final Set<DataSegment> compactedSegments = runTask(compactionTask);
|
||||
final CompactionState expectedState = new CompactionState(
|
||||
new SingleDimensionPartitionsSpec(7, null, "dim", false),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
|
||||
);
|
||||
for (DataSegment segment : compactedSegments) {
|
||||
// Expect compaction state to exist as store compaction state by default
|
||||
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
|
||||
CompactionState expectedState = new CompactionState(
|
||||
new SingleDimensionPartitionsSpec(7, null, "dim", false),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
|
||||
getObjectMapper().readValue(
|
||||
getObjectMapper().writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.NONE,
|
||||
true,
|
||||
ImmutableList.of(segment.getInterval())
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedState, segment.getLastCompactionState());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunParallelWithRangePartitioningWithSingleTask()
|
||||
public void testRunParallelWithRangePartitioningWithSingleTask() throws Exception
|
||||
{
|
||||
// Range partitioning is not supported with segment lock yet
|
||||
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
|
||||
|
@ -234,13 +267,24 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
|
|||
.build();
|
||||
|
||||
final Set<DataSegment> compactedSegments = runTask(compactionTask);
|
||||
final CompactionState expectedState = new CompactionState(
|
||||
new SingleDimensionPartitionsSpec(7, null, "dim", false),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
|
||||
);
|
||||
for (DataSegment segment : compactedSegments) {
|
||||
// Expect compaction state to exist as store compaction state by default
|
||||
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
|
||||
CompactionState expectedState = new CompactionState(
|
||||
new SingleDimensionPartitionsSpec(7, null, "dim", false),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
|
||||
getObjectMapper().readValue(
|
||||
getObjectMapper().writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.NONE,
|
||||
true,
|
||||
ImmutableList.of(segment.getInterval())
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedState, segment.getLastCompactionState());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.Pair;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
|
@ -88,7 +89,6 @@ import org.joda.time.Interval;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
@ -131,9 +131,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
0
|
||||
);
|
||||
|
||||
// Expecte compaction state to exist after compaction as we store compaction state by default
|
||||
private static CompactionState DEFAULT_COMPACTION_STATE;
|
||||
|
||||
private static final List<String> TEST_ROWS = ImmutableList.of(
|
||||
"2014-01-01T00:00:10Z,a,1\n",
|
||||
"2014-01-01T00:00:10Z,b,2\n",
|
||||
|
@ -186,14 +183,26 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
this.lockGranularity = lockGranularity;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws JsonProcessingException
|
||||
public static CompactionState getDefaultCompactionState(Granularity segmentGranularity,
|
||||
Granularity queryGranularity,
|
||||
List<Interval> intervals) throws JsonProcessingException
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
DEFAULT_COMPACTION_STATE = new CompactionState(
|
||||
// Expected compaction state to exist after compaction as we store compaction state by default
|
||||
return new CompactionState(
|
||||
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
|
||||
mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class)
|
||||
mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
|
||||
mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
segmentGranularity,
|
||||
queryGranularity,
|
||||
true,
|
||||
intervals
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -238,7 +247,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
|
||||
segments.get(i).getInterval()
|
||||
);
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
if (lockGranularity == LockGranularity.SEGMENT) {
|
||||
Assert.assertEquals(
|
||||
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
|
||||
|
@ -311,10 +323,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
|
||||
final List<DataSegment> segments = resultPair.rhs;
|
||||
Assert.assertEquals(6, segments.size());
|
||||
final CompactionState expectedState = new CompactionState(
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
|
||||
);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
|
||||
|
@ -324,6 +332,21 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
interval,
|
||||
segments.get(segmentIdx).getInterval()
|
||||
);
|
||||
CompactionState expectedState = new CompactionState(
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
|
||||
getObjectMapper().readValue(
|
||||
getObjectMapper().writeValueAsString(
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.NONE,
|
||||
true,
|
||||
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))
|
||||
)
|
||||
),
|
||||
Map.class
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedState, segments.get(segmentIdx).getLastCompactionState());
|
||||
Assert.assertSame(HashBasedNumberedShardSpec.class, segments.get(segmentIdx).getShardSpec().getClass());
|
||||
}
|
||||
|
@ -361,7 +384,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
|
||||
segments.get(i).getInterval()
|
||||
);
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
if (lockGranularity == LockGranularity.SEGMENT) {
|
||||
Assert.assertEquals(
|
||||
new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1),
|
||||
|
@ -388,7 +414,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
|
||||
segments.get(i).getInterval()
|
||||
);
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
if (lockGranularity == LockGranularity.SEGMENT) {
|
||||
Assert.assertEquals(
|
||||
new NumberedOverwriteShardSpec(
|
||||
|
@ -487,7 +516,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
|
||||
segments.get(i).getInterval()
|
||||
);
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
if (lockGranularity == LockGranularity.SEGMENT) {
|
||||
Assert.assertEquals(
|
||||
new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1),
|
||||
|
@ -526,7 +558,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
|
||||
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
|
||||
segments.get(0).getLastCompactionState()
|
||||
);
|
||||
|
||||
// hour segmentGranularity
|
||||
final CompactionTask compactionTask2 = builder
|
||||
|
@ -544,7 +579,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
for (int i = 0; i < 3; i++) {
|
||||
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
|
||||
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -781,7 +819,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
|
||||
segments.get(i).getInterval()
|
||||
);
|
||||
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
|
||||
Assert.assertEquals(
|
||||
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of()),
|
||||
segments.get(i).getLastCompactionState()
|
||||
);
|
||||
if (lockGranularity == LockGranularity.SEGMENT) {
|
||||
Assert.assertEquals(
|
||||
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
|
||||
|
|
|
@ -355,6 +355,47 @@ public class CompactionTaskTest
|
|||
segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateCompactionTaskWithGranularitySpec()
|
||||
{
|
||||
final Builder builder = new Builder(
|
||||
DATA_SOURCE,
|
||||
segmentLoaderFactory,
|
||||
RETRY_POLICY_FACTORY
|
||||
);
|
||||
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
|
||||
builder.tuningConfig(createTuningConfig());
|
||||
builder.segmentGranularity(Granularities.HOUR);
|
||||
final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
|
||||
|
||||
final Builder builder2 = new Builder(
|
||||
DATA_SOURCE,
|
||||
segmentLoaderFactory,
|
||||
RETRY_POLICY_FACTORY
|
||||
);
|
||||
builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
|
||||
builder2.tuningConfig(createTuningConfig());
|
||||
builder2.granularitySpec(new UniformGranularitySpec(Granularities.HOUR, Granularities.DAY, null));
|
||||
final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
|
||||
Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity()
|
||||
{
|
||||
final Builder builder = new Builder(
|
||||
DATA_SOURCE,
|
||||
segmentLoaderFactory,
|
||||
RETRY_POLICY_FACTORY
|
||||
);
|
||||
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
|
||||
builder.tuningConfig(createTuningConfig());
|
||||
builder.segmentGranularity(Granularities.HOUR);
|
||||
builder.granularitySpec(new UniformGranularitySpec(Granularities.MINUTE, Granularities.DAY, null));
|
||||
final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
|
||||
Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithInterval() throws IOException
|
||||
{
|
||||
|
|
|
@ -28,7 +28,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
|
|||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
|
||||
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
|
@ -41,7 +45,9 @@ import org.apache.druid.tests.TestNGGroup;
|
|||
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
|
||||
import org.apache.druid.tests.indexer.AbstractIndexerTest;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.joda.time.chrono.ISOChronology;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Guice;
|
||||
|
@ -52,8 +58,10 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
@Test(groups = {TestNGGroup.COMPACTION})
|
||||
|
@ -160,7 +168,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
LOG.info("Auto compaction test with hash partitioning");
|
||||
|
||||
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
|
||||
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1);
|
||||
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
|
||||
// 2 segments published per day after compaction.
|
||||
forceTriggerAutoCompaction(4);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
|
@ -175,7 +183,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
"city",
|
||||
false
|
||||
);
|
||||
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1);
|
||||
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
|
||||
forceTriggerAutoCompaction(2);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
verifySegmentsCompacted(rangePartitionsSpec, 2);
|
||||
|
@ -278,6 +286,55 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
|
||||
{
|
||||
loadData(INDEX_TASK);
|
||||
try (final Closeable ignored = unloader(fullDatasourceName)) {
|
||||
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||
intervalsBeforeCompaction.sort(null);
|
||||
// 4 segments across 2 days (4 total)...
|
||||
verifySegmentsCount(4);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
|
||||
Granularity newGranularity = Granularities.YEAR;
|
||||
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null));
|
||||
|
||||
LOG.info("Auto compaction test with YEAR segment granularity");
|
||||
|
||||
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
|
||||
for (String interval : intervalsBeforeCompaction) {
|
||||
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
|
||||
expectedIntervalAfterCompaction.add(newinterval.toString());
|
||||
}
|
||||
}
|
||||
forceTriggerAutoCompaction(1);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
verifySegmentsCompacted(1, 1000);
|
||||
checkCompactionIntervals(expectedIntervalAfterCompaction);
|
||||
|
||||
newGranularity = Granularities.DAY;
|
||||
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null));
|
||||
|
||||
LOG.info("Auto compaction test with DAY segment granularity");
|
||||
|
||||
// The earlier segment with YEAR granularity is still 'used' as it’s not fully overshaowed.
|
||||
// This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02.
|
||||
// The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01.
|
||||
// Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities
|
||||
// (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01)
|
||||
for (String interval : intervalsBeforeCompaction) {
|
||||
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
|
||||
expectedIntervalAfterCompaction.add(newinterval.toString());
|
||||
}
|
||||
}
|
||||
forceTriggerAutoCompaction(3);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
verifySegmentsCompacted(3, 1000);
|
||||
checkCompactionIntervals(expectedIntervalAfterCompaction);
|
||||
}
|
||||
}
|
||||
|
||||
private void loadData(String indexTask) throws Exception
|
||||
{
|
||||
String taskSpec = getResourceAsString(indexTask);
|
||||
|
@ -314,13 +371,19 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
|
||||
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception
|
||||
{
|
||||
submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1);
|
||||
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null);
|
||||
}
|
||||
|
||||
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, GranularitySpec granularitySpec) throws Exception
|
||||
{
|
||||
submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec);
|
||||
}
|
||||
|
||||
private void submitCompactionConfig(
|
||||
PartitionsSpec partitionsSpec,
|
||||
Period skipOffsetFromLatest,
|
||||
int maxNumConcurrentSubTasks
|
||||
int maxNumConcurrentSubTasks,
|
||||
GranularitySpec granularitySpec
|
||||
) throws Exception
|
||||
{
|
||||
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
|
||||
|
@ -348,6 +411,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
null,
|
||||
1
|
||||
),
|
||||
granularitySpec,
|
||||
null
|
||||
);
|
||||
compactionResource.submitCompactionConfig(compactionConfig);
|
||||
|
@ -415,11 +479,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
|
||||
private void checkCompactionIntervals(List<String> expectedIntervals)
|
||||
{
|
||||
Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> {
|
||||
final List<String> actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||
actualIntervals.sort(null);
|
||||
return actualIntervals.equals(expectedIntervals);
|
||||
final Set<String> actualIntervals = new HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
|
||||
System.out.println("ACTUAL: " + actualIntervals);
|
||||
System.out.println("EXPECTED: " + expectedIntervalsSet);
|
||||
return actualIntervals.equals(expectedIntervalsSet);
|
||||
},
|
||||
"Compaction interval check"
|
||||
);
|
||||
|
|
|
@ -23,11 +23,14 @@ import com.google.inject.Inject;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.chrono.ISOChronology;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -37,7 +40,10 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
|
@ -49,6 +55,8 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
|||
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||
|
||||
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
|
||||
private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json";
|
||||
private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json";
|
||||
|
||||
private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json";
|
||||
|
||||
|
@ -66,24 +74,41 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
|||
@Test
|
||||
public void testCompaction() throws Exception
|
||||
{
|
||||
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
|
||||
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionWithSegmentGranularity() throws Exception
|
||||
{
|
||||
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionWithGranularitySpec() throws Exception
|
||||
{
|
||||
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionWithTimestampDimension() throws Exception
|
||||
{
|
||||
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
|
||||
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null);
|
||||
}
|
||||
|
||||
private void loadDataAndCompact(String indexTask, String queriesResource) throws Exception
|
||||
private void loadDataAndCompact(
|
||||
String indexTask,
|
||||
String queriesResource,
|
||||
String compactionResource,
|
||||
GranularityType newSegmentGranularity
|
||||
) throws Exception
|
||||
{
|
||||
loadData(indexTask);
|
||||
|
||||
// 4 segments across 2 days
|
||||
checkNumberOfSegments(4);
|
||||
|
||||
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||
intervalsBeforeCompaction.sort(null);
|
||||
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||
expectedIntervalAfterCompaction.sort(null);
|
||||
try (final Closeable ignored = unloader(fullDatasourceName)) {
|
||||
String queryResponseTemplate;
|
||||
try {
|
||||
|
@ -102,12 +127,23 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
|||
|
||||
|
||||
queryHelper.testQueriesFromString(queryResponseTemplate);
|
||||
compactData();
|
||||
compactData(compactionResource, newSegmentGranularity);
|
||||
|
||||
// The original 4 segments should be compacted into 2 new segments
|
||||
checkNumberOfSegments(2);
|
||||
queryHelper.testQueriesFromString(queryResponseTemplate);
|
||||
checkCompactionIntervals(intervalsBeforeCompaction);
|
||||
|
||||
|
||||
if (newSegmentGranularity != null) {
|
||||
List<String> newIntervals = new ArrayList<>();
|
||||
for (String interval : expectedIntervalAfterCompaction) {
|
||||
for (Interval newinterval : newSegmentGranularity.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
|
||||
newIntervals.add(newinterval.toString());
|
||||
}
|
||||
}
|
||||
expectedIntervalAfterCompaction = newIntervals;
|
||||
}
|
||||
checkCompactionIntervals(expectedIntervalAfterCompaction);
|
||||
}
|
||||
}
|
||||
private void loadData(String indexTask) throws Exception
|
||||
|
@ -124,12 +160,19 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
|||
);
|
||||
}
|
||||
|
||||
private void compactData() throws Exception
|
||||
private void compactData(String compactionResource, GranularityType newSegmentGranularity) throws Exception
|
||||
{
|
||||
final String template = getResourceAsString(COMPACTION_TASK);
|
||||
String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
|
||||
String template = getResourceAsString(compactionResource);
|
||||
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
|
||||
if (newSegmentGranularity != null) {
|
||||
template = StringUtils.replace(
|
||||
template,
|
||||
"%%SEGMENTGRANULARITY%%",
|
||||
newSegmentGranularity.name()
|
||||
);
|
||||
}
|
||||
|
||||
final String taskID = indexer.submitTask(taskSpec);
|
||||
final String taskID = indexer.submitTask(template);
|
||||
LOG.info("TaskID for compaction task %s", taskID);
|
||||
indexer.waitUntilTaskCompletes(taskID);
|
||||
|
||||
|
@ -153,13 +196,13 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
|||
|
||||
private void checkCompactionIntervals(List<String> expectedIntervals)
|
||||
{
|
||||
Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> {
|
||||
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||
intervalsAfterCompaction.sort(null);
|
||||
System.out.println("AFTER: " + intervalsAfterCompaction);
|
||||
System.out.println("EXPECTED: " + expectedIntervals);
|
||||
return intervalsAfterCompaction.equals(expectedIntervals);
|
||||
final Set<String> intervalsAfterCompaction = new HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
|
||||
System.out.println("ACTUAL: " + intervalsAfterCompaction);
|
||||
System.out.println("EXPECTED: " + expectedIntervalsSet);
|
||||
return intervalsAfterCompaction.equals(expectedIntervalsSet);
|
||||
},
|
||||
"Compaction interval check"
|
||||
);
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"type" : "compact",
|
||||
"dataSource" : "%%DATASOURCE%%",
|
||||
"ioConfig" : {
|
||||
"type": "compact",
|
||||
"inputSpec": {
|
||||
"type": "interval",
|
||||
"interval": "2013-08-31/2013-09-02"
|
||||
}
|
||||
},
|
||||
"granularitySpec": {
|
||||
"segmentGranularity": "%%SEGMENTGRANULARITY%%"
|
||||
},
|
||||
"context" : {
|
||||
"storeCompactionState" : true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"type" : "compact",
|
||||
"dataSource" : "%%DATASOURCE%%",
|
||||
"ioConfig" : {
|
||||
"type": "compact",
|
||||
"inputSpec": {
|
||||
"type": "interval",
|
||||
"interval": "2013-08-31/2013-09-02"
|
||||
}
|
||||
},
|
||||
"segmentGranularity": "%%SEGMENTGRANULARITY%%",
|
||||
"context" : {
|
||||
"storeCompactionState" : true
|
||||
}
|
||||
}
|
|
@ -38,6 +38,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
private final String dataSource;
|
||||
private final ClientCompactionIOConfig ioConfig;
|
||||
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
|
||||
private final ClientCompactionTaskQueryGranularitySpec granularitySpec;
|
||||
private final Map<String, Object> context;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -46,6 +47,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
|
||||
@JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@JsonProperty("granularitySpec") ClientCompactionTaskQueryGranularitySpec granularitySpec,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
|
@ -53,6 +55,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
this.dataSource = dataSource;
|
||||
this.ioConfig = ioConfig;
|
||||
this.tuningConfig = tuningConfig;
|
||||
this.granularitySpec = granularitySpec;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
|
@ -89,12 +92,19 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
return tuningConfig;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public ClientCompactionTaskQueryGranularitySpec getGranularitySpec()
|
||||
{
|
||||
return granularitySpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -109,13 +119,14 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
Objects.equals(dataSource, that.dataSource) &&
|
||||
Objects.equals(ioConfig, that.ioConfig) &&
|
||||
Objects.equals(tuningConfig, that.tuningConfig) &&
|
||||
Objects.equals(granularitySpec, that.granularitySpec) &&
|
||||
Objects.equals(context, that.context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(id, dataSource, ioConfig, tuningConfig, context);
|
||||
return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,6 +137,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
|
|||
", dataSource='" + dataSource + '\'' +
|
||||
", ioConfig=" + ioConfig +
|
||||
", tuningConfig=" + tuningConfig +
|
||||
", granularitySpec=" + granularitySpec +
|
||||
", context=" + context +
|
||||
'}';
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class ClientCompactionTaskQueryGranularitySpec
|
||||
{
|
||||
private final Granularity segmentGranularity;
|
||||
private final Granularity queryGranularity;
|
||||
private final boolean rollup;
|
||||
|
||||
@JsonCreator
|
||||
public ClientCompactionTaskQueryGranularitySpec(
|
||||
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
@JsonProperty("queryGranularity") Granularity queryGranularity,
|
||||
@JsonProperty("rollup") Boolean rollup
|
||||
)
|
||||
{
|
||||
this.queryGranularity = queryGranularity == null ? BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY : queryGranularity;
|
||||
this.rollup = rollup == null ? BaseGranularitySpec.DEFAULT_ROLLUP : rollup;
|
||||
this.segmentGranularity = segmentGranularity == null ? BaseGranularitySpec.DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getSegmentGranularity()
|
||||
{
|
||||
return segmentGranularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getQueryGranularity()
|
||||
{
|
||||
return queryGranularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRollup()
|
||||
{
|
||||
return rollup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ClientCompactionTaskQueryGranularitySpec{" +
|
||||
"segmentGranularity=" + segmentGranularity +
|
||||
", queryGranularity=" + queryGranularity +
|
||||
", rollup=" + rollup +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ClientCompactionTaskQueryGranularitySpec that = (ClientCompactionTaskQueryGranularitySpec) o;
|
||||
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
|
||||
Objects.equals(queryGranularity, that.queryGranularity) &&
|
||||
Objects.equals(rollup, that.rollup);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(segmentGranularity, queryGranularity, rollup);
|
||||
}
|
||||
}
|
|
@ -78,7 +78,8 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
|||
String idPrefix,
|
||||
List<DataSegment> segments,
|
||||
int compactionTaskPriority,
|
||||
ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
|
||||
@Nullable Map<String, Object> context
|
||||
)
|
||||
{
|
||||
|
@ -99,6 +100,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
|||
dataSource,
|
||||
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
|
||||
tuningConfig,
|
||||
granularitySpec,
|
||||
context
|
||||
);
|
||||
return runTask(taskId, taskQuery);
|
||||
|
|
|
@ -40,6 +40,7 @@ public interface IndexingServiceClient
|
|||
List<DataSegment> segments,
|
||||
int compactionTaskPriority,
|
||||
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
|
||||
@Nullable Map<String, Object> context
|
||||
);
|
||||
|
||||
|
|
|
@ -143,5 +143,4 @@ public class ArbitraryGranularitySpec extends BaseGranularitySpec
|
|||
{
|
||||
return lookupTableBucketByDateTime;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,10 +20,14 @@
|
|||
package org.apache.druid.segment.indexing.granularity;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -31,10 +35,15 @@ import org.joda.time.Interval;
|
|||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
abstract class BaseGranularitySpec implements GranularitySpec
|
||||
public abstract class BaseGranularitySpec implements GranularitySpec
|
||||
{
|
||||
public static final Boolean DEFAULT_ROLLUP = Boolean.TRUE;
|
||||
public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
|
||||
public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
|
||||
|
||||
protected List<Interval> inputIntervals;
|
||||
protected final Boolean rollup;
|
||||
|
||||
|
@ -45,7 +54,7 @@ abstract class BaseGranularitySpec implements GranularitySpec
|
|||
} else {
|
||||
this.inputIntervals = Collections.emptyList();
|
||||
}
|
||||
this.rollup = rollup == null ? Boolean.TRUE : rollup;
|
||||
this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,6 +85,15 @@ abstract class BaseGranularitySpec implements GranularitySpec
|
|||
|
||||
protected abstract LookupIntervalBuckets getLookupTableBuckets();
|
||||
|
||||
@Override
|
||||
public Map<String, Object> asMap(ObjectMapper objectMapper)
|
||||
{
|
||||
return objectMapper.convertValue(
|
||||
this,
|
||||
new TypeReference<Map<String, Object>>() {}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a helper class to facilitate sharing the code for sortedBucketIntervals among
|
||||
* the various GranularitySpec implementations. In particular, the UniformGranularitySpec
|
||||
|
|
|
@ -21,12 +21,14 @@ package org.apache.druid.segment.indexing.granularity;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
|
@ -79,4 +81,6 @@ public interface GranularitySpec
|
|||
Granularity getQueryGranularity();
|
||||
|
||||
GranularitySpec withIntervals(List<Interval> inputIntervals);
|
||||
|
||||
Map<String, Object> asMap(ObjectMapper objectMapper);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.segment.indexing.granularity;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -30,9 +29,6 @@ import java.util.List;
|
|||
|
||||
public class UniformGranularitySpec extends BaseGranularitySpec
|
||||
{
|
||||
private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
|
||||
private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
|
||||
|
||||
private final Granularity segmentGranularity;
|
||||
private final Granularity queryGranularity;
|
||||
private final IntervalsByGranularity intervalsByGranularity;
|
||||
|
@ -144,5 +140,4 @@ public class UniformGranularitySpec extends BaseGranularitySpec
|
|||
{
|
||||
return lookupTableBucketByDateTime;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -46,6 +49,7 @@ public class DataSourceCompactionConfig
|
|||
private final Integer maxRowsPerSegment;
|
||||
private final Period skipOffsetFromLatest;
|
||||
private final UserCompactionTaskQueryTuningConfig tuningConfig;
|
||||
private final GranularitySpec granularitySpec;
|
||||
private final Map<String, Object> taskContext;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -56,6 +60,7 @@ public class DataSourceCompactionConfig
|
|||
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
|
||||
@JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@JsonProperty("granularitySpec") @Nullable GranularitySpec granularitySpec,
|
||||
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
|
||||
)
|
||||
{
|
||||
|
@ -69,6 +74,24 @@ public class DataSourceCompactionConfig
|
|||
this.maxRowsPerSegment = maxRowsPerSegment;
|
||||
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
|
||||
this.tuningConfig = tuningConfig;
|
||||
if (granularitySpec != null) {
|
||||
Preconditions.checkArgument(
|
||||
granularitySpec instanceof UniformGranularitySpec,
|
||||
"Auto compaction granularitySpec only supports uniform type"
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
granularitySpec.isRollup() == BaseGranularitySpec.DEFAULT_ROLLUP,
|
||||
"Auto compaction granularitySpec only supports default rollup value"
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
granularitySpec.getQueryGranularity().equals(BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY),
|
||||
"Auto compaction granularitySpec only supports default query granularity value");
|
||||
Preconditions.checkArgument(
|
||||
granularitySpec.inputIntervals().isEmpty(),
|
||||
"Auto compaction granularitySpec does not supports interval value"
|
||||
);
|
||||
}
|
||||
this.granularitySpec = granularitySpec;
|
||||
this.taskContext = taskContext;
|
||||
}
|
||||
|
||||
|
@ -111,6 +134,13 @@ public class DataSourceCompactionConfig
|
|||
return tuningConfig;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public GranularitySpec getGranularitySpec()
|
||||
{
|
||||
return granularitySpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public Map<String, Object> getTaskContext()
|
||||
|
@ -131,8 +161,10 @@ public class DataSourceCompactionConfig
|
|||
return taskPriority == that.taskPriority &&
|
||||
inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
|
||||
Objects.equals(dataSource, that.dataSource) &&
|
||||
Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
|
||||
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
|
||||
Objects.equals(tuningConfig, that.tuningConfig) &&
|
||||
Objects.equals(granularitySpec, that.granularitySpec) &&
|
||||
Objects.equals(taskContext, that.taskContext);
|
||||
}
|
||||
|
||||
|
@ -143,8 +175,10 @@ public class DataSourceCompactionConfig
|
|||
dataSource,
|
||||
taskPriority,
|
||||
inputSegmentSizeBytes,
|
||||
maxRowsPerSegment,
|
||||
skipOffsetFromLatest,
|
||||
tuningConfig,
|
||||
granularitySpec,
|
||||
taskContext
|
||||
);
|
||||
}
|
||||
|
|
|
@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||
import org.apache.druid.client.indexing.TaskPayloadResponse;
|
||||
import org.apache.druid.indexer.TaskStatusPlus;
|
||||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
|
||||
import org.apache.druid.server.coordinator.CompactionStatistics;
|
||||
|
@ -123,8 +125,27 @@ public class CompactSegments implements CoordinatorDuty
|
|||
}
|
||||
if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
|
||||
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload();
|
||||
DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource());
|
||||
if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null) {
|
||||
Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity();
|
||||
if (configuredSegmentGranularity != null
|
||||
&& compactionTaskQuery.getGranularitySpec() != null
|
||||
&& !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) {
|
||||
// We will cancel active compaction task if segmentGranularity changes and we will need to
|
||||
// re-compact the interval
|
||||
LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config "
|
||||
+ "segmentGranularity is [%s]",
|
||||
status.getId(),
|
||||
compactionTaskQuery.getGranularitySpec().getSegmentGranularity(),
|
||||
configuredSegmentGranularity);
|
||||
indexingServiceClient.cancelTask(status.getId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Skip interval as the current active compaction task is good
|
||||
final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
|
||||
compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
|
||||
// Since we keep the current active compaction task running, we count the active task slots
|
||||
numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask(
|
||||
compactionTaskQuery.getTuningConfig()
|
||||
);
|
||||
|
@ -289,12 +310,24 @@ public class CompactSegments implements CoordinatorDuty
|
|||
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
|
||||
|
||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
|
||||
ClientCompactionTaskQueryGranularitySpec queryGranularitySpec;
|
||||
if (config.getGranularitySpec() != null) {
|
||||
queryGranularitySpec = new ClientCompactionTaskQueryGranularitySpec(
|
||||
config.getGranularitySpec().getSegmentGranularity(),
|
||||
config.getGranularitySpec().getQueryGranularity(),
|
||||
config.getGranularitySpec().isRollup()
|
||||
);
|
||||
} else {
|
||||
queryGranularitySpec = null;
|
||||
}
|
||||
|
||||
// make tuningConfig
|
||||
final String taskId = indexingServiceClient.compactSegments(
|
||||
"coordinator-issued",
|
||||
segmentsToCompact,
|
||||
config.getTaskPriority(),
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
|
||||
queryGranularitySpec,
|
||||
newAutoCompactionContext(config.getTaskContext())
|
||||
);
|
||||
|
||||
|
|
|
@ -28,19 +28,27 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
|||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.SegmentUtils;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.server.coordinator.CompactionStatistics;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.timeline.CompactionState;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.Partitions;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.apache.druid.timeline.TimelineObjectHolder;
|
||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
|
||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.apache.druid.timeline.partition.PartitionChunk;
|
||||
import org.apache.druid.timeline.partition.ShardSpec;
|
||||
import org.apache.druid.utils.Streams;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -51,12 +59,14 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -75,6 +85,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
// searchIntervals keeps track of the current state of which interval should be considered to search segments to
|
||||
// compact.
|
||||
private final Map<String, CompactibleTimelineObjectHolderCursor> timelineIterators;
|
||||
// This is needed for datasource that has segmentGranularity configured
|
||||
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
|
||||
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
|
||||
// run of the compaction job and skip any interval that was already previously compacted.
|
||||
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>();
|
||||
|
||||
private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
|
||||
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, o1.interval)
|
||||
|
@ -93,12 +108,53 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
|
||||
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> {
|
||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
|
||||
|
||||
Granularity configuredSegmentGranularity = null;
|
||||
if (config != null && !timeline.isEmpty()) {
|
||||
Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new HashMap<>();
|
||||
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||
Map<Interval, Set<DataSegment>> intervalToPartitionMap = new HashMap<>();
|
||||
configuredSegmentGranularity = config.getGranularitySpec().getSegmentGranularity();
|
||||
// Create a new timeline to hold segments in the new configured segment granularity
|
||||
VersionedIntervalTimeline<String, DataSegment> timelineWithConfiguredSegmentGranularity = new VersionedIntervalTimeline<>(Comparator.naturalOrder());
|
||||
Set<DataSegment> segments = timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
|
||||
for (DataSegment segment : segments) {
|
||||
// Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity
|
||||
// For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity
|
||||
// and the configuredSegmentGranularity is MONTH, the segment will be split to two segments
|
||||
// of 2020-01/2020-02 and 2020-02/2020-03.
|
||||
for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) {
|
||||
intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment);
|
||||
}
|
||||
}
|
||||
for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : intervalToPartitionMap.entrySet()) {
|
||||
Interval interval = partitionsPerInterval.getKey();
|
||||
int partitionNum = 0;
|
||||
Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
|
||||
int partitions = segmentSet.size();
|
||||
for (DataSegment segment : segmentSet) {
|
||||
DataSegment segmentsForCompact = segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions));
|
||||
// PartitionHolder can only holds chunks of one partition space
|
||||
// However, partition in the new timeline (timelineWithConfiguredSegmentGranularity) can be hold multiple
|
||||
// partitions of the original timeline (when the new segmentGranularity is larger than the original
|
||||
// segmentGranularity). Hence, we group all the segments of the original timeline into intervals bucket
|
||||
// by the new configuredSegmentGranularity. We then convert each segment into a new partition space so that
|
||||
// there is no duplicate partitionNum across all segments of each new Interval. We will have to save the
|
||||
// original ShardSpec to convert the segment back when returning from the iterator.
|
||||
originalShardSpecs.put(new Pair<>(interval, segmentsForCompact.getId()), segment.getShardSpec());
|
||||
timelineWithConfiguredSegmentGranularity.add(
|
||||
interval,
|
||||
segmentsForCompact.getVersion(),
|
||||
NumberedPartitionChunk.make(partitionNum, partitions, segmentsForCompact)
|
||||
);
|
||||
partitionNum += 1;
|
||||
}
|
||||
}
|
||||
timeline = timelineWithConfiguredSegmentGranularity;
|
||||
}
|
||||
final List<Interval> searchIntervals =
|
||||
findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
|
||||
findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, skipIntervals.get(dataSource));
|
||||
if (!searchIntervals.isEmpty()) {
|
||||
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
|
||||
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalShardSpecs));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -187,10 +243,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
|
||||
{
|
||||
private final List<TimelineObjectHolder<String, DataSegment>> holders;
|
||||
private final Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs;
|
||||
|
||||
CompactibleTimelineObjectHolderCursor(
|
||||
VersionedIntervalTimeline<String, DataSegment> timeline,
|
||||
List<Interval> totalIntervalsToSearch
|
||||
List<Interval> totalIntervalsToSearch,
|
||||
Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs
|
||||
)
|
||||
{
|
||||
this.holders = totalIntervalsToSearch
|
||||
|
@ -201,6 +259,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
.filter(holder -> isCompactibleHolder(interval, holder))
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
this.originalShardSpecs = originalShardSpecs;
|
||||
}
|
||||
|
||||
private boolean isCompactibleHolder(Interval interval, TimelineObjectHolder<String, DataSegment> holder)
|
||||
|
@ -220,6 +279,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
return partitionBytes > 0;
|
||||
}
|
||||
|
||||
private DataSegment transformShardSpecIfNeeded(DataSegment dataSegment, Interval interval)
|
||||
{
|
||||
if (originalShardSpecs != null && !originalShardSpecs.isEmpty()) {
|
||||
return dataSegment.withShardSpec(originalShardSpecs.get(new Pair<>(interval, dataSegment.getId())));
|
||||
}
|
||||
return dataSegment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
|
@ -232,8 +299,10 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
if (holders.isEmpty()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
return Streams.sequentialStreamFrom(holders.remove(holders.size() - 1).getObject())
|
||||
TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1);
|
||||
return Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
|
||||
.map(PartitionChunk::getObject)
|
||||
.map(dataSegment -> transformShardSpecIfNeeded(dataSegment, timelineObjectHolder.getTrueInterval()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
@ -257,10 +326,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
}
|
||||
}
|
||||
|
||||
private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConfig, SegmentsToCompact candidates)
|
||||
private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates)
|
||||
{
|
||||
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
|
||||
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig =
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
|
||||
final PartitionsSpec partitionsSpecFromConfig = findPartitinosSpecFromConfig(tuningConfig);
|
||||
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
|
||||
if (lastCompactionState == null) {
|
||||
|
@ -314,6 +384,22 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
needsCompaction = true;
|
||||
}
|
||||
|
||||
// Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity
|
||||
final Granularity segmentGranularity = lastCompactionState.getGranularitySpec() != null ?
|
||||
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() :
|
||||
null;
|
||||
|
||||
if (config.getGranularitySpec() != null &&
|
||||
config.getGranularitySpec().getSegmentGranularity() != null &&
|
||||
!config.getGranularitySpec().getSegmentGranularity().equals(segmentGranularity)) {
|
||||
log.info(
|
||||
"Configured granularitySpec[%s] is different from the one[%s] of segments. Needs compaction",
|
||||
config.getGranularitySpec(),
|
||||
segmentGranularity
|
||||
);
|
||||
needsCompaction = true;
|
||||
}
|
||||
|
||||
return needsCompaction;
|
||||
}
|
||||
|
||||
|
@ -334,16 +420,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
final long inputSegmentSize = config.getInputSegmentSizeBytes();
|
||||
|
||||
while (compactibleTimelineObjectHolderCursor.hasNext()) {
|
||||
final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
|
||||
|
||||
List<DataSegment> segments = compactibleTimelineObjectHolderCursor.next();
|
||||
final SegmentsToCompact candidates = new SegmentsToCompact(segments);
|
||||
if (!candidates.isEmpty()) {
|
||||
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
|
||||
final boolean needsCompaction = needsCompaction(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
|
||||
config,
|
||||
candidates
|
||||
);
|
||||
|
||||
if (isCompactibleSize && needsCompaction) {
|
||||
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||
Interval interval = candidates.getUmbrellaInterval();
|
||||
Set<Interval> intervalsCompacted = intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new HashSet<>());
|
||||
// Skip this candidates if we have compacted the interval already
|
||||
if (intervalsCompacted.contains(interval)) {
|
||||
continue;
|
||||
}
|
||||
intervalsCompacted.add(interval);
|
||||
}
|
||||
return candidates;
|
||||
} else {
|
||||
if (!needsCompaction) {
|
||||
|
@ -396,6 +491,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
private static List<Interval> findInitialSearchInterval(
|
||||
VersionedIntervalTimeline<String, DataSegment> timeline,
|
||||
Period skipOffset,
|
||||
Granularity configuredSegmentGranularity,
|
||||
@Nullable List<Interval> skipIntervals
|
||||
)
|
||||
{
|
||||
|
@ -407,6 +503,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
|
||||
last.getInterval().getEnd(),
|
||||
skipOffset,
|
||||
configuredSegmentGranularity,
|
||||
skipIntervals
|
||||
);
|
||||
|
||||
|
@ -447,19 +544,27 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
static List<Interval> sortAndAddSkipIntervalFromLatest(
|
||||
DateTime latest,
|
||||
Period skipOffset,
|
||||
Granularity configuredSegmentGranularity,
|
||||
@Nullable List<Interval> skipIntervals
|
||||
)
|
||||
{
|
||||
final List<Interval> nonNullSkipIntervals = skipIntervals == null
|
||||
? new ArrayList<>(1)
|
||||
: new ArrayList<>(skipIntervals.size());
|
||||
final Interval skipFromLatest;
|
||||
if (configuredSegmentGranularity != null) {
|
||||
DateTime skipFromLastest = new DateTime(latest, latest.getZone()).minus(skipOffset);
|
||||
DateTime skipOffsetBucketToSegmentGranularity = configuredSegmentGranularity.bucketStart(skipFromLastest);
|
||||
skipFromLatest = new Interval(skipOffsetBucketToSegmentGranularity, latest);
|
||||
} else {
|
||||
skipFromLatest = new Interval(skipOffset, latest);
|
||||
}
|
||||
|
||||
if (skipIntervals != null) {
|
||||
final List<Interval> sortedSkipIntervals = new ArrayList<>(skipIntervals);
|
||||
sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
|
||||
|
||||
final List<Interval> overlapIntervals = new ArrayList<>();
|
||||
final Interval skipFromLatest = new Interval(skipOffset, latest);
|
||||
|
||||
for (Interval interval : sortedSkipIntervals) {
|
||||
if (interval.overlaps(skipFromLatest)) {
|
||||
|
@ -476,7 +581,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
nonNullSkipIntervals.add(skipFromLatest);
|
||||
}
|
||||
} else {
|
||||
final Interval skipFromLatest = new Interval(skipOffset, latest);
|
||||
nonNullSkipIntervals.add(skipFromLatest);
|
||||
}
|
||||
|
||||
|
@ -579,6 +683,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
return segments.size();
|
||||
}
|
||||
|
||||
private Interval getUmbrellaInterval()
|
||||
{
|
||||
return JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private long getNumberOfIntervals()
|
||||
{
|
||||
return segments.stream().map(DataSegment::getInterval).distinct().count();
|
||||
|
|
|
@ -50,6 +50,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
|
|||
List<DataSegment> segments,
|
||||
int compactionTaskPriority,
|
||||
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
|
||||
@Nullable Map<String, Object> context
|
||||
)
|
||||
{
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ArbitraryGranularityTest
|
||||
{
|
||||
|
@ -217,6 +218,32 @@ public class ArbitraryGranularityTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsMap()
|
||||
{
|
||||
final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, Lists.newArrayList(
|
||||
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
|
||||
Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
|
||||
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
|
||||
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
|
||||
Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
|
||||
));
|
||||
|
||||
Map<String, Object> map = spec.asMap(JSON_MAPPER);
|
||||
final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, GranularitySpec.class);
|
||||
Assert.assertEquals(
|
||||
"Round-trip",
|
||||
ImmutableList.copyOf(spec.sortedBucketIntervals()),
|
||||
ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"Round-trip",
|
||||
ImmutableList.copyOf(spec.inputIntervals()),
|
||||
ImmutableList.copyOf(rtSpec.inputIntervals())
|
||||
);
|
||||
Assert.assertEquals(spec, rtSpec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullInputIntervals()
|
||||
{
|
||||
|
|
|
@ -38,10 +38,11 @@ import org.junit.Test;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class UniformGranularityTest
|
||||
{
|
||||
private static final ObjectMapper JOSN_MAPPER = new DefaultObjectMapper();
|
||||
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testSimple()
|
||||
|
@ -161,7 +162,7 @@ public class UniformGranularityTest
|
|||
);
|
||||
|
||||
try {
|
||||
final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class);
|
||||
final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class);
|
||||
Assert.assertEquals(
|
||||
"Round-trip sortedBucketIntervals",
|
||||
ImmutableList.copyOf(spec.sortedBucketIntervals()),
|
||||
|
@ -178,6 +179,34 @@ public class UniformGranularityTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsMap()
|
||||
{
|
||||
final GranularitySpec spec = new UniformGranularitySpec(
|
||||
Granularities.DAY,
|
||||
null,
|
||||
Lists.newArrayList(
|
||||
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
|
||||
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
|
||||
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
|
||||
Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
|
||||
)
|
||||
);
|
||||
Map<String, Object> map = spec.asMap(JSON_MAPPER);
|
||||
final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, GranularitySpec.class);
|
||||
Assert.assertEquals(
|
||||
"Round-trip sortedBucketIntervals",
|
||||
ImmutableList.copyOf(spec.sortedBucketIntervals()),
|
||||
ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"Round-trip granularity",
|
||||
spec.getSegmentGranularity(),
|
||||
rtSpec.getSegmentGranularity()
|
||||
);
|
||||
Assert.assertEquals(spec, rtSpec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals()
|
||||
{
|
||||
|
|
|
@ -20,15 +20,20 @@
|
|||
package org.apache.druid.server.coordinator;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.data.input.SegmentsSplitHintSpec;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
|
||||
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
|
||||
import org.apache.druid.segment.data.CompressionStrategy;
|
||||
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
|
@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
|
|||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
null,
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
final String json = OBJECT_MAPPER.writeValueAsString(config);
|
||||
|
@ -68,6 +74,7 @@ public class DataSourceCompactionConfigTest
|
|||
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
|
||||
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
|
||||
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
|
||||
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -80,6 +87,7 @@ public class DataSourceCompactionConfigTest
|
|||
30,
|
||||
new Period(3600),
|
||||
null,
|
||||
null,
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
final String json = OBJECT_MAPPER.writeValueAsString(config);
|
||||
|
@ -122,6 +130,7 @@ public class DataSourceCompactionConfigTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
final String json = OBJECT_MAPPER.writeValueAsString(config);
|
||||
|
@ -164,6 +173,7 @@ public class DataSourceCompactionConfigTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
|
||||
|
@ -217,4 +227,92 @@ public class DataSourceCompactionConfigTest
|
|||
OBJECT_MAPPER.readValue(json, UserCompactionTaskQueryTuningConfig.class);
|
||||
Assert.assertEquals(tuningConfig, fromJson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeGranularitySpec() throws IOException
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"dataSource",
|
||||
null,
|
||||
500L,
|
||||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularities.HOUR, null, null),
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
final String json = OBJECT_MAPPER.writeValueAsString(config);
|
||||
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
|
||||
|
||||
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
|
||||
Assert.assertEquals(25, fromJson.getTaskPriority());
|
||||
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
|
||||
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
|
||||
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
|
||||
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
|
||||
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity()
|
||||
{
|
||||
new DataSourceCompactionConfig(
|
||||
"dataSource",
|
||||
null,
|
||||
500L,
|
||||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, null),
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testFailIfGranularitySpecContainsNonDefaultRollup()
|
||||
{
|
||||
new DataSourceCompactionConfig(
|
||||
"dataSource",
|
||||
null,
|
||||
500L,
|
||||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, false, null),
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testFailIfGranularitySpecContainsNonEmptyInterval()
|
||||
{
|
||||
new DataSourceCompactionConfig(
|
||||
"dataSource",
|
||||
null,
|
||||
500L,
|
||||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, ImmutableList.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z"))),
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testFailIfGranularitySpecIsNotUniform()
|
||||
{
|
||||
new DataSourceCompactionConfig(
|
||||
"dataSource",
|
||||
null,
|
||||
500L,
|
||||
null,
|
||||
new Period(3600),
|
||||
null,
|
||||
new ArbitraryGranularitySpec(null, null, null),
|
||||
ImmutableMap.of("key", "val")
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -28,16 +28,24 @@ import com.google.common.collect.Lists;
|
|||
import junitparams.converters.Nullable;
|
||||
import org.apache.commons.lang3.mutable.MutableInt;
|
||||
import org.apache.druid.client.DataSourcesSnapshot;
|
||||
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
|
||||
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||
import org.apache.druid.client.indexing.ClientTaskQuery;
|
||||
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
|
||||
import org.apache.druid.client.indexing.IndexingWorker;
|
||||
import org.apache.druid.client.indexing.IndexingWorkerInfo;
|
||||
import org.apache.druid.client.indexing.TaskPayloadResponse;
|
||||
import org.apache.druid.discovery.DruidLeaderClient;
|
||||
import org.apache.druid.discovery.DruidNodeDiscovery;
|
||||
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.indexer.RunnerTaskState;
|
||||
import org.apache.druid.indexer.TaskLocation;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.indexer.TaskStatusPlus;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
|
@ -47,8 +55,10 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
|
||||
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
|
||||
|
@ -81,6 +91,8 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -89,6 +101,7 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
@ -101,6 +114,7 @@ public class CompactSegmentsTest
|
|||
{
|
||||
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
||||
private static final String DATA_SOURCE_PREFIX = "dataSource_";
|
||||
private static final int PARTITION_PER_TIME_INTERVAL = 4;
|
||||
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
|
||||
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
|
||||
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
|
||||
|
@ -144,6 +158,7 @@ public class CompactSegmentsTest
|
|||
private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory;
|
||||
|
||||
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
|
||||
Map<String, List<DataSegment>> datasourceToSegments = new HashMap<>();
|
||||
|
||||
public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction<Integer, Integer, ShardSpec> shardSpecFactory)
|
||||
{
|
||||
|
@ -154,18 +169,23 @@ public class CompactSegmentsTest
|
|||
@Before
|
||||
public void setup()
|
||||
{
|
||||
List<DataSegment> segments = new ArrayList<>();
|
||||
List<DataSegment> allSegments = new ArrayList<>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final String dataSource = DATA_SOURCE_PREFIX + i;
|
||||
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
|
||||
for (int k = 0; k < 4; k++) {
|
||||
segments.add(createSegment(dataSource, j, true, k));
|
||||
segments.add(createSegment(dataSource, j, false, k));
|
||||
for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
|
||||
List<DataSegment> segmentForDatasource = datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>());
|
||||
DataSegment dataSegment = createSegment(dataSource, j, true, k);
|
||||
allSegments.add(dataSegment);
|
||||
segmentForDatasource.add(dataSegment);
|
||||
dataSegment = createSegment(dataSource, j, false, k);
|
||||
allSegments.add(dataSegment);
|
||||
segmentForDatasource.add(dataSegment);
|
||||
}
|
||||
}
|
||||
}
|
||||
dataSources = DataSourcesSnapshot
|
||||
.fromUsedSegments(segments, ImmutableMap.of())
|
||||
.fromUsedSegments(allSegments, ImmutableMap.of())
|
||||
.getUsedSegmentsTimelinesPerDataSource();
|
||||
}
|
||||
|
||||
|
@ -351,17 +371,17 @@ public class CompactSegmentsTest
|
|||
String dataSourceName = DATA_SOURCE_PREFIX + 1;
|
||||
List<DataSegment> segments = new ArrayList<>();
|
||||
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
|
||||
for (int k = 0; k < 4; k++) {
|
||||
for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
|
||||
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
|
||||
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
|
||||
if (j == 3) {
|
||||
// Make two intervals on this day compacted (two compacted intervals back-to-back)
|
||||
beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
|
||||
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
|
||||
beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
|
||||
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
|
||||
}
|
||||
if (j == 1) {
|
||||
// Make one interval on this day compacted
|
||||
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
|
||||
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
|
||||
}
|
||||
segments.add(beforeNoon);
|
||||
segments.add(afterNoon);
|
||||
|
@ -538,6 +558,205 @@ public class CompactSegmentsTest
|
|||
Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactWithoutGranularitySpec()
|
||||
{
|
||||
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
|
||||
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
|
||||
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
|
||||
final String dataSource = DATA_SOURCE_PREFIX + 0;
|
||||
compactionConfigs.add(
|
||||
new DataSourceCompactionConfig(
|
||||
dataSource,
|
||||
0,
|
||||
500L,
|
||||
null,
|
||||
new Period("PT0H"), // smaller than segment interval
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
doCompactSegments(compactSegments, compactionConfigs);
|
||||
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
|
||||
Mockito.verify(mockIndexingServiceClient).compactSegments(
|
||||
ArgumentMatchers.anyString(),
|
||||
segmentsCaptor.capture(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any(),
|
||||
granularitySpecArgumentCaptor.capture(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
// Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
|
||||
Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, segmentsCaptor.getValue().size());
|
||||
Assert.assertNull(granularitySpecArgumentCaptor.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactWithGranularitySpec()
|
||||
{
|
||||
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
|
||||
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
|
||||
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
|
||||
final String dataSource = DATA_SOURCE_PREFIX + 0;
|
||||
compactionConfigs.add(
|
||||
new DataSourceCompactionConfig(
|
||||
dataSource,
|
||||
0,
|
||||
500L,
|
||||
null,
|
||||
new Period("PT0H"), // smaller than segment interval
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.YEAR, null, null),
|
||||
null
|
||||
)
|
||||
);
|
||||
doCompactSegments(compactSegments, compactionConfigs);
|
||||
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
|
||||
Mockito.verify(mockIndexingServiceClient).compactSegments(
|
||||
ArgumentMatchers.anyString(),
|
||||
segmentsCaptor.capture(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any(),
|
||||
granularitySpecArgumentCaptor.capture(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
|
||||
// are within the same year
|
||||
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
|
||||
Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactWithGranularitySpecConflictWithActiveCompactionTask()
|
||||
{
|
||||
final String dataSource = DATA_SOURCE_PREFIX + 0;
|
||||
final String conflictTaskId = "taskIdDummy";
|
||||
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
|
||||
TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus(
|
||||
conflictTaskId,
|
||||
"groupId",
|
||||
"compact",
|
||||
DateTimes.EPOCH,
|
||||
DateTimes.EPOCH,
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.RUNNING,
|
||||
-1L,
|
||||
TaskLocation.unknown(),
|
||||
dataSource,
|
||||
null
|
||||
);
|
||||
TaskPayloadResponse runningConflictCompactionTaskPayload = new TaskPayloadResponse(
|
||||
conflictTaskId,
|
||||
new ClientCompactionTaskQuery(
|
||||
conflictTaskId,
|
||||
dataSource,
|
||||
new ClientCompactionIOConfig(
|
||||
new ClientCompactionIntervalSpec(
|
||||
Intervals.of("2000/2099"),
|
||||
"testSha256OfSortedSegmentIds"
|
||||
)
|
||||
),
|
||||
null,
|
||||
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, null, null),
|
||||
null
|
||||
)
|
||||
);
|
||||
Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
|
||||
Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
|
||||
|
||||
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
|
||||
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
|
||||
compactionConfigs.add(
|
||||
new DataSourceCompactionConfig(
|
||||
dataSource,
|
||||
0,
|
||||
500L,
|
||||
null,
|
||||
new Period("PT0H"), // smaller than segment interval
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.YEAR, null, null),
|
||||
null
|
||||
)
|
||||
);
|
||||
doCompactSegments(compactSegments, compactionConfigs);
|
||||
// Verify that conflict task was canceled
|
||||
Mockito.verify(mockIndexingServiceClient).cancelTask(conflictTaskId);
|
||||
// The active conflict task has interval of 2000/2099
|
||||
// Make sure that we do not skip interval of conflict task.
|
||||
// Since we cancel the task and will have to compact those intervals with the new segmentGranulartity
|
||||
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
|
||||
Mockito.verify(mockIndexingServiceClient).compactSegments(
|
||||
ArgumentMatchers.anyString(),
|
||||
segmentsCaptor.capture(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any(),
|
||||
granularitySpecArgumentCaptor.capture(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
|
||||
// are within the same year
|
||||
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
|
||||
Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunParallelCompactionMultipleCompactionTaskSlots()
|
||||
{
|
||||
|
@ -831,6 +1050,7 @@ public class CompactSegmentsTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
|
@ -984,7 +1204,8 @@ public class CompactSegmentsTest
|
|||
"lz4",
|
||||
"longEncoding",
|
||||
"longs"
|
||||
)
|
||||
),
|
||||
ImmutableMap.of()
|
||||
),
|
||||
1,
|
||||
segmentSize
|
||||
|
|
|
@ -71,6 +71,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
|
||||
DateTimes.of("2019-01-01"),
|
||||
new Period(72, 0, 0, 0),
|
||||
null,
|
||||
ImmutableList.of(
|
||||
Intervals.of("2018-12-30/2018-12-31"),
|
||||
Intervals.of("2018-12-24/2018-12-25")
|
||||
|
@ -90,6 +91,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -128,6 +130,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -166,6 +169,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -204,6 +208,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -242,6 +247,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -280,6 +286,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -318,6 +325,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -356,6 +364,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
@ -394,6 +403,7 @@ public class NewestSegmentFirstIteratorTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
|
|
|
@ -22,10 +22,15 @@ package org.apache.druid.server.coordinator.duty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.Partitions;
|
||||
|
@ -58,7 +63,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -83,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -116,7 +121,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -149,7 +154,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testHugeShard()
|
||||
{
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -199,7 +204,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testManySegmentsPerShard()
|
||||
{
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -259,9 +264,9 @@ public class NewestSegmentFirstPolicyTest
|
|||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(
|
||||
unknownDataSource,
|
||||
createCompactionConfig(10000, new Period("P2D")),
|
||||
createCompactionConfig(10000, new Period("P2D"), null),
|
||||
DATA_SOURCE,
|
||||
createCompactionConfig(10000, new Period("P2D"))
|
||||
createCompactionConfig(10000, new Period("P2D"), null)
|
||||
),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -307,7 +312,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
)
|
||||
);
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
@ -340,7 +345,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
@ -361,7 +366,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
@ -369,12 +374,88 @@ public class NewestSegmentFirstPolicyTest
|
|||
Assert.assertFalse(iterator.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("P1D")),
|
||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.DAY, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
// We should only get segments in Oct
|
||||
final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
// This contains segment that
|
||||
// - Cross between month boundary of latest month (starts in Nov and ends in Dec). This should be skipped
|
||||
// - Fully in latest month (starts in Dec and ends in Dec). This should be skipped
|
||||
// - Does not overlap latest month (starts in Oct and ends in Oct). This should not be skipped
|
||||
new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")),
|
||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
// We should only get segments in Oct
|
||||
final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
List<DataSegment> actual = iterator.next();
|
||||
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
|
||||
Assert.assertFalse(iterator.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
new SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")),
|
||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
// We should only get segments in Oct
|
||||
final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithSkipIntervals()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -414,7 +495,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"))),
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
createTimeline(
|
||||
|
@ -455,6 +536,113 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
// Segments with day interval from Oct to Dec
|
||||
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
// We should get all segments in timeline back since skip offset is P0D.
|
||||
// However, we only need to iterator 3 times (once for each month) since the new configured segmentGranularity is MONTH.
|
||||
// and hence iterator would return all segments bucketed to the configured segmentGranularity
|
||||
// Month of Dec
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-31T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.copyOf(expectedSegmentsToCompact),
|
||||
ImmutableSet.copyOf(iterator.next())
|
||||
);
|
||||
// Month of Nov
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-11-01T00:00:00/2017-12-01T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.copyOf(expectedSegmentsToCompact),
|
||||
ImmutableSet.copyOf(iterator.next())
|
||||
);
|
||||
// Month of Oct
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-11-01T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.copyOf(expectedSegmentsToCompact),
|
||||
ImmutableSet.copyOf(iterator.next())
|
||||
);
|
||||
// No more
|
||||
Assert.assertFalse(iterator.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new Period("P7D")),
|
||||
new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new Period("P7D")),
|
||||
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
// We should get the segment of "2020-01-28/2020-02-03" back twice when the iterator returns for Jan and when the
|
||||
// iterator returns for Feb.
|
||||
|
||||
// Month of Jan
|
||||
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
List<DataSegment> actual = iterator.next();
|
||||
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
|
||||
// Month of Feb
|
||||
expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
actual = iterator.next();
|
||||
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
|
||||
// No more
|
||||
Assert.assertFalse(iterator.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIteratorDoesNotReturnCompactedInterval()
|
||||
{
|
||||
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
|
||||
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(iterator.next()));
|
||||
// Iterator should return only once since all the "minute" interval of the iterator contains the same interval
|
||||
Assert.assertFalse(iterator.hasNext());
|
||||
}
|
||||
|
||||
private static void assertCompactSegmentIntervals(
|
||||
CompactionSegmentIterator iterator,
|
||||
Period segmentPeriod,
|
||||
|
@ -546,7 +734,8 @@ public class NewestSegmentFirstPolicyTest
|
|||
|
||||
private DataSourceCompactionConfig createCompactionConfig(
|
||||
long inputSegmentSizeBytes,
|
||||
Period skipOffsetFromLatest
|
||||
Period skipOffsetFromLatest,
|
||||
GranularitySpec granularitySpec
|
||||
)
|
||||
{
|
||||
return new DataSourceCompactionConfig(
|
||||
|
@ -556,6 +745,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
null,
|
||||
skipOffsetFromLatest,
|
||||
null,
|
||||
granularitySpec,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
|
|
@ -277,7 +277,8 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
|
||||
private final CompactionState expectedCompactionState = new CompactionState(
|
||||
new DynamicPartitionsSpec(null, null),
|
||||
Collections.singletonMap("test", "map")
|
||||
Collections.singletonMap("test", "map"),
|
||||
Collections.singletonMap("test2", "map2")
|
||||
);
|
||||
|
||||
private final DataSegment publishedCompactedSegment1 = new DataSegment(
|
||||
|
|
Loading…
Reference in New Issue