mirror of https://github.com/apache/druid.git
Fix auto compaction to consider intervals of running tasks (#6767)
* Fix auto compaction to consider intervals of running tasks * adjust initial collection size
This commit is contained in:
parent
7c7997e8a1
commit
fa7cb906e4
|
@ -42,6 +42,7 @@ import org.openjdk.jmh.annotations.Setup;
|
||||||
import org.openjdk.jmh.annotations.State;
|
import org.openjdk.jmh.annotations.State;
|
||||||
import org.openjdk.jmh.infra.Blackhole;
|
import org.openjdk.jmh.infra.Blackhole;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -137,7 +138,7 @@ public class NewestSegmentFirstPolicyBenchmark
|
||||||
@Benchmark
|
@Benchmark
|
||||||
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
|
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
|
||||||
{
|
{
|
||||||
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
|
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
|
||||||
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
|
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
|
||||||
final List<DataSegment> segments = iterator.next();
|
final List<DataSegment> segments = iterator.next();
|
||||||
blackhole.consume(segments);
|
blackhole.consume(segments);
|
||||||
|
|
|
@ -27,7 +27,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ClientAppendQuery
|
public class ClientAppendQuery implements ClientQuery
|
||||||
{
|
{
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
private final List<DataSegment> segments;
|
private final List<DataSegment> segments;
|
||||||
|
@ -43,12 +43,14 @@ public class ClientAppendQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getType()
|
public String getType()
|
||||||
{
|
{
|
||||||
return "append";
|
return "append";
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getDataSource()
|
public String getDataSource()
|
||||||
{
|
{
|
||||||
return dataSource;
|
return dataSource;
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.druid.timeline.DataSegment;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class ClientCompactQuery
|
public class ClientCompactQuery implements ClientQuery
|
||||||
{
|
{
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
private final List<DataSegment> segments;
|
private final List<DataSegment> segments;
|
||||||
|
@ -54,12 +54,14 @@ public class ClientCompactQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getType()
|
public String getType()
|
||||||
{
|
{
|
||||||
return "compact";
|
return "compact";
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getDataSource()
|
public String getDataSource()
|
||||||
{
|
{
|
||||||
return dataSource;
|
return dataSource;
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.joda.time.Interval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ClientKillQuery
|
public class ClientKillQuery implements ClientQuery
|
||||||
{
|
{
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
private final Interval interval;
|
private final Interval interval;
|
||||||
|
@ -41,12 +41,14 @@ public class ClientKillQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getType()
|
public String getType()
|
||||||
{
|
{
|
||||||
return "kill";
|
return "kill";
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getDataSource()
|
public String getDataSource()
|
||||||
{
|
{
|
||||||
return dataSource;
|
return dataSource;
|
||||||
|
|
|
@ -28,7 +28,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ClientMergeQuery
|
public class ClientMergeQuery implements ClientQuery
|
||||||
{
|
{
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
private final List<DataSegment> segments;
|
private final List<DataSegment> segments;
|
||||||
|
@ -48,12 +48,14 @@ public class ClientMergeQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getType()
|
public String getType()
|
||||||
{
|
{
|
||||||
return "merge";
|
return "merge";
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@Override
|
||||||
public String getDataSource()
|
public String getDataSource()
|
||||||
{
|
{
|
||||||
return dataSource;
|
return dataSource;
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.client.indexing;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* org.apache.druid.indexing.common.task.Task representation for clients
|
||||||
|
*/
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
|
@JsonSubTypes(value = {
|
||||||
|
@Type(name = "append", value = ClientAppendQuery.class),
|
||||||
|
@Type(name = "merge", value = ClientMergeQuery.class),
|
||||||
|
@Type(name = "kill", value = ClientKillQuery.class),
|
||||||
|
@Type(name = "compact", value = ClientCompactQuery.class)
|
||||||
|
})
|
||||||
|
public interface ClientQuery
|
||||||
|
{
|
||||||
|
String getType();
|
||||||
|
|
||||||
|
String getDataSource();
|
||||||
|
}
|
|
@ -278,6 +278,26 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
||||||
return completeTaskStatuses.isEmpty() ? null : completeTaskStatuses.get(0);
|
return completeTaskStatuses.isEmpty() ? null : completeTaskStatuses.get(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskPayloadResponse getTaskPayload(String taskId)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
final FullResponseHolder responseHolder = druidLeaderClient.go(
|
||||||
|
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
|
||||||
|
);
|
||||||
|
|
||||||
|
return jsonMapper.readValue(
|
||||||
|
responseHolder.getContent(),
|
||||||
|
new TypeReference<TaskPayloadResponse>()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (IOException | InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int killPendingSegments(String dataSource, DateTime end)
|
public int killPendingSegments(String dataSource, DateTime end)
|
||||||
{
|
{
|
||||||
|
|
|
@ -61,4 +61,7 @@ public interface IndexingServiceClient
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
TaskStatusPlus getLastCompleteTask();
|
TaskStatusPlus getLastCompleteTask();
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
TaskPayloadResponse getTaskPayload(String taskId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.client.indexing;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
public class TaskPayloadResponse
|
||||||
|
{
|
||||||
|
private final String task;
|
||||||
|
private final ClientQuery payload;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public TaskPayloadResponse(
|
||||||
|
@JsonProperty("task") final String task,
|
||||||
|
@JsonProperty("payload") final ClientQuery payload
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.task = task;
|
||||||
|
this.payload = payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getTask()
|
||||||
|
{
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public ClientQuery getPayload()
|
||||||
|
{
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "TaskPayloadResponse{" +
|
||||||
|
"task='" + task + '\'' +
|
||||||
|
", payload=" + payload +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper;
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,6 +37,7 @@ public interface CompactionSegmentSearchPolicy
|
||||||
*/
|
*/
|
||||||
CompactionSegmentIterator reset(
|
CompactionSegmentIterator reset(
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
|
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
|
||||||
|
Map<String, List<Interval>> skipIntervals
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,13 @@ package org.apache.druid.server.coordinator.helper;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
||||||
|
import org.apache.druid.client.indexing.ClientCompactQuery;
|
||||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||||
|
import org.apache.druid.client.indexing.TaskPayloadResponse;
|
||||||
import org.apache.druid.indexer.TaskStatusPlus;
|
import org.apache.druid.indexer.TaskStatusPlus;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.JodaUtils;
|
||||||
|
import org.apache.druid.java.util.common.guava.Comparators;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
|
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorStats;
|
import org.apache.druid.server.coordinator.CoordinatorStats;
|
||||||
|
@ -32,10 +36,12 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.DataSegmentUtils;
|
import org.apache.druid.timeline.DataSegmentUtils;
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -77,23 +83,46 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
|
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
|
||||||
.stream()
|
.stream()
|
||||||
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
|
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
|
||||||
final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks(
|
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
|
||||||
indexingServiceClient.getRunningTasks(),
|
indexingServiceClient.getRunningTasks(),
|
||||||
indexingServiceClient.getPendingTasks(),
|
indexingServiceClient.getPendingTasks(),
|
||||||
indexingServiceClient.getWaitingTasks()
|
indexingServiceClient.getWaitingTasks()
|
||||||
);
|
);
|
||||||
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
|
// dataSource -> list of intervals of compact tasks
|
||||||
|
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
|
||||||
|
for (TaskStatusPlus status : compactTasks) {
|
||||||
|
final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
|
||||||
|
if (response == null) {
|
||||||
|
throw new ISE("WTH? got a null paylord from overlord for task[%s]", status.getId());
|
||||||
|
}
|
||||||
|
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
|
||||||
|
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
|
||||||
|
final Interval interval = JodaUtils.umbrellaInterval(
|
||||||
|
compactQuery.getSegments()
|
||||||
|
.stream()
|
||||||
|
.map(DataSegment::getInterval)
|
||||||
|
.sorted(Comparators.intervalsByStartThenEnd())
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
);
|
||||||
|
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
|
||||||
|
} else {
|
||||||
|
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, compactTaskIntervals);
|
||||||
|
|
||||||
final int compactionTaskCapacity = (int) Math.min(
|
final int compactionTaskCapacity = (int) Math.min(
|
||||||
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
|
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
|
||||||
dynamicConfig.getMaxCompactionTaskSlots()
|
dynamicConfig.getMaxCompactionTaskSlots()
|
||||||
);
|
);
|
||||||
final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ?
|
final int numNonCompleteCompactionTasks = compactTasks.size();
|
||||||
compactionTaskCapacity - numNonCompleteCompactionTasks :
|
final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0
|
||||||
|
? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks)
|
||||||
// compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
|
// compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
|
||||||
// This guarantees that at least one slot is available if
|
// This guarantees that at least one slot is available if
|
||||||
// compaction is enabled and numRunningCompactTasks is 0.
|
// compaction is enabled and numRunningCompactTasks is 0.
|
||||||
Math.max(1, compactionTaskCapacity);
|
: Math.max(1, compactionTaskCapacity);
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
|
"Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
|
||||||
numAvailableCompactionTaskSlots,
|
numAvailableCompactionTaskSlots,
|
||||||
|
@ -117,7 +146,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
|
||||||
}
|
}
|
||||||
|
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
private static int findNumNonCompleteCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
|
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
|
||||||
{
|
{
|
||||||
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
|
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
|
||||||
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
|
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
|
||||||
|
@ -132,8 +161,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
|
||||||
// performance.
|
// performance.
|
||||||
return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
|
return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
|
||||||
})
|
})
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList());
|
||||||
.size();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private CoordinatorStats doRun(
|
private CoordinatorStats doRun(
|
||||||
|
|
|
@ -19,10 +19,12 @@
|
||||||
|
|
||||||
package org.apache.druid.server.coordinator.helper;
|
package org.apache.druid.server.coordinator.helper;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.JodaUtils;
|
||||||
import org.apache.druid.java.util.common.guava.Comparators;
|
import org.apache.druid.java.util.common.guava.Comparators;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
|
@ -67,7 +69,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
|
|
||||||
NewestSegmentFirstIterator(
|
NewestSegmentFirstIterator(
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
|
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
|
||||||
|
Map<String, List<Interval>> skipIntervals
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.compactionConfigs = compactionConfigs;
|
this.compactionConfigs = compactionConfigs;
|
||||||
|
@ -80,9 +83,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
|
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
|
||||||
|
|
||||||
if (config != null && !timeline.isEmpty()) {
|
if (config != null && !timeline.isEmpty()) {
|
||||||
final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
|
final List<Interval> searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
|
||||||
if (searchInterval != null) {
|
if (!searchIntervals.isEmpty()) {
|
||||||
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
|
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,19 +189,22 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
|
|
||||||
CompactibleTimelineObjectHolderCursor(
|
CompactibleTimelineObjectHolderCursor(
|
||||||
VersionedIntervalTimeline<String, DataSegment> timeline,
|
VersionedIntervalTimeline<String, DataSegment> timeline,
|
||||||
Interval totalIntervalToSearch
|
List<Interval> totalIntervalsToSearch
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.holders = timeline
|
this.holders = totalIntervalsToSearch
|
||||||
.lookup(totalIntervalToSearch)
|
.stream()
|
||||||
|
.flatMap(interval -> timeline
|
||||||
|
.lookup(interval)
|
||||||
.stream()
|
.stream()
|
||||||
.filter(holder -> {
|
.filter(holder -> {
|
||||||
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
|
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
|
||||||
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
|
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
|
||||||
return chunks.size() > 0
|
return chunks.size() > 0
|
||||||
&& partitionBytes > 0
|
&& partitionBytes > 0
|
||||||
&& totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval());
|
&& interval.contains(chunks.get(0).getObject().getInterval());
|
||||||
})
|
})
|
||||||
|
)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,14 +346,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
* Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
|
* Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
|
||||||
*
|
*
|
||||||
* @param timeline timeline of a dataSource
|
* @param timeline timeline of a dataSource
|
||||||
* @param skipOffset skipOFfset
|
* @param skipIntervals intervals to skip
|
||||||
*
|
*
|
||||||
* @return found interval to search or null if it's not found
|
* @return found interval to search or null if it's not found
|
||||||
*/
|
*/
|
||||||
@Nullable
|
private static List<Interval> findInitialSearchInterval(
|
||||||
private static Interval findInitialSearchInterval(
|
|
||||||
VersionedIntervalTimeline<String, DataSegment> timeline,
|
VersionedIntervalTimeline<String, DataSegment> timeline,
|
||||||
Period skipOffset
|
Period skipOffset,
|
||||||
|
@Nullable List<Interval> skipIntervals
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty");
|
Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty");
|
||||||
|
@ -355,35 +361,118 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
|
|
||||||
final TimelineObjectHolder<String, DataSegment> first = Preconditions.checkNotNull(timeline.first(), "first");
|
final TimelineObjectHolder<String, DataSegment> first = Preconditions.checkNotNull(timeline.first(), "first");
|
||||||
final TimelineObjectHolder<String, DataSegment> last = Preconditions.checkNotNull(timeline.last(), "last");
|
final TimelineObjectHolder<String, DataSegment> last = Preconditions.checkNotNull(timeline.last(), "last");
|
||||||
|
final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
|
||||||
|
last.getInterval().getEnd(),
|
||||||
|
skipOffset,
|
||||||
|
skipIntervals
|
||||||
|
);
|
||||||
|
|
||||||
final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
|
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
|
||||||
|
final List<Interval> filteredInterval = filterSkipIntervals(totalInterval, fullSkipIntervals);
|
||||||
|
final List<Interval> searchIntervals = new ArrayList<>();
|
||||||
|
|
||||||
final DateTime lookupStart = first.getInterval().getStart();
|
for (Interval lookupInterval : filteredInterval) {
|
||||||
final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
|
|
||||||
if (lookupStart.isBefore(lookupEnd)) {
|
|
||||||
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
|
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
|
||||||
new Interval(lookupStart, lookupEnd)
|
new Interval(lookupInterval.getStart(), lookupInterval.getEnd())
|
||||||
);
|
);
|
||||||
|
|
||||||
final List<DataSegment> segments = holders
|
final List<DataSegment> segments = holders
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
|
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
|
||||||
.map(PartitionChunk::getObject)
|
.map(PartitionChunk::getObject)
|
||||||
.filter(segment -> !segment.getInterval().overlaps(skipInterval))
|
.filter(segment -> lookupInterval.contains(segment.getInterval()))
|
||||||
.sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
|
.sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
if (segments.isEmpty()) {
|
if (!segments.isEmpty()) {
|
||||||
return null;
|
searchIntervals.add(
|
||||||
} else {
|
new Interval(
|
||||||
return new Interval(
|
|
||||||
segments.get(0).getInterval().getStart(),
|
segments.get(0).getInterval().getStart(),
|
||||||
segments.get(segments.size() - 1).getInterval().getEnd()
|
segments.get(segments.size() - 1).getInterval().getEnd()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return searchIntervals;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static List<Interval> sortAndAddSkipIntervalFromLatest(
|
||||||
|
DateTime latest,
|
||||||
|
Period skipOffset,
|
||||||
|
@Nullable List<Interval> skipIntervals
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final List<Interval> nonNullSkipIntervals = skipIntervals == null
|
||||||
|
? new ArrayList<>(1)
|
||||||
|
: new ArrayList<>(skipIntervals.size());
|
||||||
|
|
||||||
|
if (skipIntervals != null) {
|
||||||
|
final List<Interval> sortedSkipIntervals = new ArrayList<>(skipIntervals);
|
||||||
|
sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
|
||||||
|
|
||||||
|
final List<Interval> overlapIntervals = new ArrayList<>();
|
||||||
|
final Interval skipFromLatest = new Interval(skipOffset, latest);
|
||||||
|
|
||||||
|
for (Interval interval : sortedSkipIntervals) {
|
||||||
|
if (interval.overlaps(skipFromLatest)) {
|
||||||
|
overlapIntervals.add(interval);
|
||||||
|
} else {
|
||||||
|
nonNullSkipIntervals.add(interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!overlapIntervals.isEmpty()) {
|
||||||
|
overlapIntervals.add(skipFromLatest);
|
||||||
|
nonNullSkipIntervals.add(JodaUtils.umbrellaInterval(overlapIntervals));
|
||||||
|
} else {
|
||||||
|
nonNullSkipIntervals.add(skipFromLatest);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
final Interval skipFromLatest = new Interval(skipOffset, latest);
|
||||||
|
nonNullSkipIntervals.add(skipFromLatest);
|
||||||
|
}
|
||||||
|
|
||||||
|
return nonNullSkipIntervals;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of intervals which are contained by totalInterval but don't ovarlap with skipIntervals.
|
||||||
|
*
|
||||||
|
* @param totalInterval total interval
|
||||||
|
* @param skipIntervals intervals to skip. This should be sorted by {@link Comparators#intervalsByStartThenEnd()}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static List<Interval> filterSkipIntervals(Interval totalInterval, List<Interval> skipIntervals)
|
||||||
|
{
|
||||||
|
final List<Interval> filteredIntervals = new ArrayList<>(skipIntervals.size() + 1);
|
||||||
|
|
||||||
|
DateTime remainingStart = totalInterval.getStart();
|
||||||
|
DateTime remainingEnd = totalInterval.getEnd();
|
||||||
|
for (Interval skipInterval : skipIntervals) {
|
||||||
|
if (skipInterval.getStart().isBefore(remainingStart) && skipInterval.getEnd().isAfter(remainingStart)) {
|
||||||
|
remainingStart = skipInterval.getEnd();
|
||||||
|
} else if (skipInterval.getStart().isBefore(remainingEnd) && skipInterval.getEnd().isAfter(remainingEnd)) {
|
||||||
|
remainingEnd = skipInterval.getStart();
|
||||||
|
} else if (!remainingStart.isAfter(skipInterval.getStart()) && !remainingEnd.isBefore(skipInterval.getEnd())) {
|
||||||
|
filteredIntervals.add(new Interval(remainingStart, skipInterval.getStart()));
|
||||||
|
remainingStart = skipInterval.getEnd();
|
||||||
|
} else {
|
||||||
|
// Ignore this skipInterval
|
||||||
|
log.warn(
|
||||||
|
"skipInterval[%s] is not contained in remainingInterval[%s]",
|
||||||
|
skipInterval,
|
||||||
|
new Interval(remainingStart, remainingEnd)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!remainingStart.equals(remainingEnd)) {
|
||||||
|
filteredIntervals.add(new Interval(remainingStart, remainingEnd));
|
||||||
|
}
|
||||||
|
|
||||||
|
return filteredIntervals;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class QueueEntry
|
private static class QueueEntry
|
||||||
|
|
|
@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper;
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,9 +35,10 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
||||||
@Override
|
@Override
|
||||||
public CompactionSegmentIterator reset(
|
public CompactionSegmentIterator reset(
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
|
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
|
||||||
|
Map<String, List<Interval>> skipIntervals
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new NewestSegmentFirstIterator(compactionConfigs, dataSources);
|
return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,4 +109,10 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskPayloadResponse getTaskPayload(String taskId)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.coordinator.helper;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.joda.time.Period;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class NewestSegmentFirstIteratorTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testFilterSkipIntervals()
|
||||||
|
{
|
||||||
|
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
|
||||||
|
final List<Interval> expectedSkipIntervals = ImmutableList.of(
|
||||||
|
Intervals.of("2018-01-15/2018-03-02"),
|
||||||
|
Intervals.of("2018-07-23/2018-10-01"),
|
||||||
|
Intervals.of("2018-10-02/2018-12-25"),
|
||||||
|
Intervals.of("2018-12-31/2019-01-01")
|
||||||
|
);
|
||||||
|
final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
|
||||||
|
totalInterval,
|
||||||
|
Lists.newArrayList(
|
||||||
|
Intervals.of("2017-12-01/2018-01-15"),
|
||||||
|
Intervals.of("2018-03-02/2018-07-23"),
|
||||||
|
Intervals.of("2018-10-01/2018-10-02"),
|
||||||
|
Intervals.of("2018-12-25/2018-12-31")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddSkipIntervalFromLatestAndSort()
|
||||||
|
{
|
||||||
|
final List<Interval> expectedIntervals = ImmutableList.of(
|
||||||
|
Intervals.of("2018-12-24/2018-12-25"),
|
||||||
|
Intervals.of("2018-12-29/2019-01-01")
|
||||||
|
);
|
||||||
|
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
|
||||||
|
DateTimes.of("2019-01-01"),
|
||||||
|
new Period(72, 0, 0, 0),
|
||||||
|
ImmutableList.of(
|
||||||
|
Intervals.of("2018-12-30/2018-12-31"),
|
||||||
|
Intervals.of("2018-12-24/2018-12-25")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
|
||||||
|
}
|
||||||
|
}
|
|
@ -78,7 +78,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertCompactSegmentIntervals(
|
assertCompactSegmentIntervals(
|
||||||
|
@ -102,7 +103,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertCompactSegmentIntervals(
|
assertCompactSegmentIntervals(
|
||||||
|
@ -178,7 +180,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
|
// larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertCompactSegmentIntervals(
|
assertCompactSegmentIntervals(
|
||||||
|
@ -211,7 +214,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
|
// larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertCompactSegmentIntervals(
|
assertCompactSegmentIntervals(
|
||||||
|
@ -258,7 +262,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
DEFAULT_NUM_SEGMENTS_PER_SHARD
|
DEFAULT_NUM_SEGMENTS_PER_SHARD
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Interval lastInterval = null;
|
Interval lastInterval = null;
|
||||||
|
@ -313,7 +318,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
80
|
80
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Interval lastInterval = null;
|
Interval lastInterval = null;
|
||||||
|
@ -392,7 +398,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
150
|
150
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
@ -416,7 +423,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertCompactSegmentIntervals(
|
assertCompactSegmentIntervals(
|
||||||
|
@ -449,7 +457,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
1
|
1
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
@ -481,7 +490,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.reset(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline)
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
final List<DataSegment> expectedSegmentsToCompact = timeline
|
final List<DataSegment> expectedSegmentsToCompact = timeline
|
||||||
|
@ -510,7 +520,8 @@ public class NewestSegmentFirstPolicyTest
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.reset(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline)
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
@ -530,12 +541,53 @@ public class NewestSegmentFirstPolicyTest
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.reset(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline)
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
|
Collections.emptyMap()
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithSkipIntervals()
|
||||||
|
{
|
||||||
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
|
final CompactionSegmentIterator iterator = policy.reset(
|
||||||
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
|
||||||
|
ImmutableMap.of(
|
||||||
|
DATA_SOURCE,
|
||||||
|
createTimeline(
|
||||||
|
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
|
||||||
|
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ImmutableMap.of(
|
||||||
|
DATA_SOURCE,
|
||||||
|
ImmutableList.of(
|
||||||
|
Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"),
|
||||||
|
Intervals.of("2017-11-15T00:00:00/2017-11-15T20:00:00"),
|
||||||
|
Intervals.of("2017-11-13T00:00:00/2017-11-14T01:00:00")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
assertCompactSegmentIntervals(
|
||||||
|
iterator,
|
||||||
|
segmentPeriod,
|
||||||
|
Intervals.of("2017-11-15T20:00:00/2017-11-15T21:00:00"),
|
||||||
|
Intervals.of("2017-11-15T23:00:00/2017-11-16T00:00:00"),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
|
assertCompactSegmentIntervals(
|
||||||
|
iterator,
|
||||||
|
segmentPeriod,
|
||||||
|
Intervals.of("2017-11-14T01:00:00/2017-11-14T02:00:00"),
|
||||||
|
Intervals.of("2017-11-14T23:00:00/2017-11-15T00:00:00"),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertCompactSegmentIntervals(
|
private static void assertCompactSegmentIntervals(
|
||||||
CompactionSegmentIterator iterator,
|
CompactionSegmentIterator iterator,
|
||||||
Period segmentPeriod,
|
Period segmentPeriod,
|
||||||
|
|
Loading…
Reference in New Issue