getTotalWorkerCapacity();
+ /**
+ * Checks if compaction supervisors are enabled on the Overlord.
+ * When this returns true, the Coordinator does not run CompactSegments duty.
+ *
+ * API: {@code /druid/indexer/v1/compaction/isSupervisorEnabled}
+ */
+ ListenableFuture isCompactionSupervisorEnabled();
+
+ /**
+ * Gets the number of bytes yet to be compacted for the given datasource.
+ *
+ * API: {@code /druid/indexer/v1/compaction/progress}
+ */
+ ListenableFuture getBytesAwaitingCompaction(String dataSource);
+
+ /**
+ * Gets the latest compaction snapshots of one or all datasources.
+ *
+ * API: {@code /druid/indexer/v1/compaction/status}
+ *
+ * @param dataSource If passed as non-null, then the returned list contains only
+ * the snapshot for this datasource.
+ */
+ ListenableFuture> getCompactionSnapshots(@Nullable String dataSource);
+
/**
* Returns a copy of this client with a different retry policy.
*/
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 3e3d86ca5f2..2a48e0ed692 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -45,6 +45,7 @@ import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
@@ -312,6 +313,52 @@ public class OverlordClientImpl implements OverlordClient
);
}
+ @Override
+ public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource)
+ {
+ final StringBuilder pathBuilder = new StringBuilder("/druid/indexer/v1/compaction/status");
+ if (dataSource != null && !dataSource.isEmpty()) {
+ pathBuilder.append("?").append("dataSource=").append(dataSource);
+ }
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, pathBuilder.toString()),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(
+ jsonMapper,
+ holder.getContent(),
+ new TypeReference>() {}
+ )
+ );
+ }
+
+ @Override
+ public ListenableFuture getBytesAwaitingCompaction(String dataSource)
+ {
+ final String path = "/druid/indexer/v1/compaction/progress?dataSource=" + dataSource;
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, path),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), Long.class)
+ );
+ }
+
+ @Override
+ public ListenableFuture isCompactionSupervisorEnabled()
+ {
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/isSupervisorEnabled"),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), Boolean.class)
+ );
+ }
+
@Override
public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
new file mode 100644
index 00000000000..21f6cafbfaf
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.guava.Comparators;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * Base implementation of {@link CompactionCandidateSearchPolicy} that can have
+ * a {@code priorityDatasource}.
+ */
+public abstract class BaseCandidateSearchPolicy
+ implements CompactionCandidateSearchPolicy, Comparator
+{
+ private final String priorityDatasource;
+ private final Comparator comparator;
+
+ protected BaseCandidateSearchPolicy(@Nullable String priorityDatasource)
+ {
+ this.priorityDatasource = priorityDatasource;
+ if (priorityDatasource == null || priorityDatasource.isEmpty()) {
+ this.comparator = getSegmentComparator();
+ } else {
+ this.comparator = Comparators.alwaysFirst(priorityDatasource)
+ .onResultOf(CompactionCandidate::getDataSource)
+ .thenComparing(getSegmentComparator());
+ }
+ }
+
+ /**
+ * The candidates of this datasource are prioritized over all others.
+ */
+ @Nullable
+ @JsonProperty
+ public final String getPriorityDatasource()
+ {
+ return priorityDatasource;
+ }
+
+ @Override
+ public final int compare(CompactionCandidate o1, CompactionCandidate o2)
+ {
+ return comparator.compare(o1, o2);
+ }
+
+ @Override
+ public boolean isEligibleForCompaction(
+ CompactionCandidate candidate,
+ CompactionStatus currentCompactionStatus,
+ CompactionTaskStatus latestTaskStatus
+ )
+ {
+ return true;
+ }
+
+ /**
+ * Compares between two compaction candidates. Used to determine the
+ * order in which segments and intervals should be picked for compaction.
+ */
+ protected abstract Comparator getSegmentComparator();
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BaseCandidateSearchPolicy that = (BaseCandidateSearchPolicy) o;
+ return Objects.equals(this.priorityDatasource, that.priorityDatasource);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(priorityDatasource);
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
similarity index 55%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
rename to server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
index 27ce9beab81..4cd9b22df81 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
@@ -17,53 +17,42 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
+package org.apache.druid.server.compaction;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
-import java.util.Collections;
+import javax.annotation.Nullable;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.stream.Collectors;
/**
- * List of segments to compact.
+ * Non-empty list of segments of a datasource being considered for compaction.
+ * A candidate typically contains all the segments of a single time chunk.
*/
-public class SegmentsToCompact
+public class CompactionCandidate
{
- private static final SegmentsToCompact EMPTY_INSTANCE = new SegmentsToCompact();
-
private final List segments;
private final Interval umbrellaInterval;
+ private final String dataSource;
private final long totalBytes;
private final int numIntervals;
- static SegmentsToCompact empty()
- {
- return EMPTY_INSTANCE;
- }
+ private final CompactionStatus currentStatus;
- public static SegmentsToCompact from(List segments)
+ public static CompactionCandidate from(List segments)
{
if (segments == null || segments.isEmpty()) {
- return empty();
+ throw InvalidInput.exception("Segments to compact must be non-empty");
} else {
- return new SegmentsToCompact(segments);
+ return new CompactionCandidate(segments, null);
}
}
- private SegmentsToCompact()
- {
- this.segments = Collections.emptyList();
- this.totalBytes = 0L;
- this.numIntervals = 0;
- this.umbrellaInterval = null;
- }
-
- private SegmentsToCompact(List segments)
+ private CompactionCandidate(List segments, @Nullable CompactionStatus currentStatus)
{
this.segments = segments;
this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
@@ -71,53 +60,72 @@ public class SegmentsToCompact
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
this.numIntervals = (int) segments.stream().map(DataSegment::getInterval).distinct().count();
+ this.dataSource = segments.get(0).getDataSource();
+ this.currentStatus = currentStatus;
}
+ /**
+ * @return Non-empty list of segments that make up this candidate.
+ */
public List getSegments()
{
return segments;
}
- public DataSegment getFirst()
- {
- if (segments.isEmpty()) {
- throw new NoSuchElementException("No segment to compact");
- } else {
- return segments.get(0);
- }
- }
-
- public boolean isEmpty()
- {
- return segments.isEmpty();
- }
-
public long getTotalBytes()
{
return totalBytes;
}
- public int size()
+ public int numSegments()
{
return segments.size();
}
+ /**
+ * Umbrella interval of all the segments in this candidate. This typically
+ * corresponds to a single time chunk in the segment timeline.
+ */
public Interval getUmbrellaInterval()
{
return umbrellaInterval;
}
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
public CompactionStatistics getStats()
{
- return CompactionStatistics.create(totalBytes, size(), numIntervals);
+ return CompactionStatistics.create(totalBytes, numSegments(), numIntervals);
+ }
+
+ /**
+ * Current compaction status of the time chunk corresponding to this candidate.
+ */
+ @Nullable
+ public CompactionStatus getCurrentStatus()
+ {
+ return currentStatus;
+ }
+
+ /**
+ * Creates a copy of this CompactionCandidate object with the given status.
+ */
+ public CompactionCandidate withCurrentStatus(CompactionStatus status)
+ {
+ return new CompactionCandidate(this.segments, status);
}
@Override
public String toString()
{
return "SegmentsToCompact{" +
- "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+ "datasource=" + dataSource +
+ ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", totalSize=" + totalBytes +
+ ", currentStatus=" + currentStatus +
'}';
}
}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
new file mode 100644
index 00000000000..cf0e016f054
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+
+import java.util.Comparator;
+
+/**
+ * Policy used by {@link CompactSegments} duty to pick segments for compaction.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class)
+})
+public interface CompactionCandidateSearchPolicy extends Comparator
+{
+ /**
+ * Compares between two compaction candidates. Used to determine the
+ * order in which segments and intervals should be picked for compaction.
+ */
+ @Override
+ int compare(CompactionCandidate o1, CompactionCandidate o2);
+
+ /**
+ * Checks if the given {@link CompactionCandidate} is eligible for compaction
+ * in the current iteration. A policy may implement this method to skip
+ * compacting intervals or segments that do not fulfil some required criteria.
+ */
+ boolean isEligibleForCompaction(
+ CompactionCandidate candidate,
+ CompactionStatus currentCompactionStatus,
+ CompactionTaskStatus latestTaskStatus
+ );
+}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
new file mode 100644
index 00000000000..6e8ec6b4b11
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.client.indexing.TaskStatusResponse;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Simulates runs of auto-compaction duty to obtain the expected list of
+ * compaction tasks that would be submitted by the actual compaction duty.
+ */
+public class CompactionRunSimulator
+{
+ private final CompactionStatusTracker statusTracker;
+ private final OverlordClient readOnlyOverlordClient;
+
+ public CompactionRunSimulator(
+ CompactionStatusTracker statusTracker,
+ OverlordClient overlordClient
+ )
+ {
+ this.statusTracker = statusTracker;
+ this.readOnlyOverlordClient = new ReadOnlyOverlordClient(overlordClient);
+ }
+
+ /**
+ * Simulates a run of the compact segments duty with the given compaction config
+ * assuming unlimited compaction task slots.
+ */
+ public CompactionSimulateResult simulateRunWithConfig(
+ DruidCompactionConfig compactionConfig,
+ Map datasourceTimelines
+ )
+ {
+ final Table compactedIntervals
+ = Table.withColumnNames("dataSource", "interval", "numSegments", "bytes");
+ final Table runningIntervals
+ = Table.withColumnNames("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact");
+ final Table queuedIntervals
+ = Table.withColumnNames("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact");
+ final Table skippedIntervals
+ = Table.withColumnNames("dataSource", "interval", "numSegments", "bytes", "reasonToSkip");
+
+ // Add a read-only wrapper over the actual status tracker so that we can
+ // account for the active tasks
+ final CompactionStatusTracker simulationStatusTracker = new CompactionStatusTracker(null)
+ {
+ @Override
+ public CompactionStatus computeCompactionStatus(
+ CompactionCandidate candidate,
+ DataSourceCompactionConfig config,
+ CompactionCandidateSearchPolicy searchPolicy
+ )
+ {
+ return statusTracker.computeCompactionStatus(candidate, config, searchPolicy);
+ }
+
+ @Override
+ public void onCompactionStatusComputed(
+ CompactionCandidate candidateSegments,
+ DataSourceCompactionConfig config
+ )
+ {
+ final CompactionStatus status = candidateSegments.getCurrentStatus();
+ if (status.getState() == CompactionStatus.State.COMPLETE) {
+ compactedIntervals.addRow(
+ createRow(candidateSegments, null, null)
+ );
+ } else if (status.getState() == CompactionStatus.State.RUNNING) {
+ runningIntervals.addRow(
+ createRow(candidateSegments, ClientCompactionTaskQueryTuningConfig.from(config), status.getReason())
+ );
+ } else if (status.getState() == CompactionStatus.State.SKIPPED) {
+ skippedIntervals.addRow(
+ createRow(candidateSegments, null, status.getReason())
+ );
+ }
+ }
+
+ @Override
+ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCandidate candidateSegments)
+ {
+ // Add a row for each task in order of submission
+ final CompactionStatus status = candidateSegments.getCurrentStatus();
+ queuedIntervals.addRow(
+ createRow(candidateSegments, taskPayload.getTuningConfig(), status.getReason())
+ );
+ }
+ };
+
+ // Unlimited task slots to ensure that simulator does not skip any interval
+ final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig(
+ new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
+ );
+
+ final CoordinatorRunStats stats = new CoordinatorRunStats();
+ new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
+ configWithUnlimitedTaskSlots,
+ datasourceTimelines,
+ stats
+ );
+
+ final Map compactionStates = new HashMap<>();
+ if (!compactedIntervals.isEmpty()) {
+ compactionStates.put(CompactionStatus.State.COMPLETE, compactedIntervals);
+ }
+ if (!runningIntervals.isEmpty()) {
+ compactionStates.put(CompactionStatus.State.RUNNING, runningIntervals);
+ }
+ if (!queuedIntervals.isEmpty()) {
+ compactionStates.put(CompactionStatus.State.PENDING, queuedIntervals);
+ }
+ if (!skippedIntervals.isEmpty()) {
+ compactionStates.put(CompactionStatus.State.SKIPPED, skippedIntervals);
+ }
+
+ return new CompactionSimulateResult(compactionStates);
+ }
+
+ private Object[] createRow(
+ CompactionCandidate candidate,
+ ClientCompactionTaskQueryTuningConfig tuningConfig,
+ String reason
+ )
+ {
+ final List row = new ArrayList<>();
+ row.add(candidate.getDataSource());
+ row.add(candidate.getUmbrellaInterval());
+ row.add(candidate.numSegments());
+ row.add(candidate.getTotalBytes());
+ if (tuningConfig != null) {
+ row.add(CompactSegments.findMaxNumTaskSlotsUsedByOneNativeCompactionTask(tuningConfig));
+ }
+ if (reason != null) {
+ row.add(reason);
+ }
+
+ return row.toArray(new Object[0]);
+ }
+
+ /**
+ * Dummy overlord client that returns empty results for all APIs.
+ */
+ private static class ReadOnlyOverlordClient implements OverlordClient
+ {
+ final OverlordClient delegate;
+
+ ReadOnlyOverlordClient(OverlordClient delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ListenableFuture> taskStatuses(
+ @Nullable String state,
+ @Nullable String dataSource,
+ @Nullable Integer maxCompletedTasks
+ )
+ {
+ return delegate.taskStatuses(state, dataSource, maxCompletedTasks);
+ }
+
+ @Override
+ public ListenableFuture> taskStatuses(Set taskIds)
+ {
+ return delegate.taskStatuses(taskIds);
+ }
+
+ @Override
+ public ListenableFuture taskPayload(String taskId)
+ {
+ return delegate.taskPayload(taskId);
+ }
+
+ @Override
+ public ListenableFuture>> findLockedIntervals(List lockFilterPolicies)
+ {
+ return delegate.findLockedIntervals(lockFilterPolicies);
+ }
+
+ @Override
+ public ListenableFuture getTotalWorkerCapacity()
+ {
+ // Unlimited worker capacity to ensure that simulator does not skip any interval
+ return Futures.immediateFuture(
+ new IndexingTotalWorkerCapacityInfo(Integer.MAX_VALUE, Integer.MAX_VALUE)
+ );
+ }
+
+ @Override
+ public ListenableFuture runTask(String taskId, Object taskObject)
+ {
+ return Futures.immediateVoidFuture();
+ }
+
+ @Override
+ public ListenableFuture cancelTask(String taskId)
+ {
+ return Futures.immediateVoidFuture();
+ }
+
+ // Unsupported methods as these are not used by the CompactionScheduler / CompactSegments duty
+
+ @Override
+ public ListenableFuture findCurrentLeader()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture taskStatus(String taskId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture taskReportAsMap(String taskId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture> supervisorStatuses()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture killPendingSegments(String dataSource, Interval interval)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture> getWorkers()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture getBytesAwaitingCompaction(String dataSource)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture isCompactionSupervisorEnabled()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
+ {
+ return this;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentIterator.java
similarity index 56%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
rename to server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentIterator.java
index cc5f4f59d85..295aa2881ad 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionSegmentIterator.java
@@ -17,27 +17,27 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
-
-import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.server.coordinator.duty.CompactSegments;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
+package org.apache.druid.server.compaction;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
/**
- * Segment searching policy used by {@link CompactSegments}.
+ * Iterator over compactible segments.
*/
-public interface CompactionSegmentSearchPolicy
+public interface CompactionSegmentIterator extends Iterator
{
/**
- * Creates an iterator that returns compactible segments.
+ * List of candidate segments that are already compacted and do not need to be
+ * compacted again. None of these segments are returned by {@link #next()}.
*/
- CompactionSegmentIterator createIterator(
- Map compactionConfigs,
- Map dataSources,
- Map> skipIntervals
- );
+ List getCompactedSegments();
+
+ /**
+ * List of candidate segments that have been skipped for compaction as they
+ * cannot be compacted due to some reason. None of these segments are returned
+ * by {@link #next()}.
+ */
+ List getSkippedSegments();
+
}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java
new file mode 100644
index 00000000000..7a48ccf0e5b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+public class CompactionSimulateResult
+{
+ private final Map compactionStates;
+
+ @JsonCreator
+ public CompactionSimulateResult(
+ @JsonProperty("compactionStates") Map compactionStates
+ )
+ {
+ this.compactionStates = compactionStates;
+ }
+
+ @JsonProperty
+ public Map getCompactionStates()
+ {
+ return compactionStates;
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
similarity index 96%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
rename to server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
index 6997dec47c0..23f1b7fe9ef 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
+package org.apache.druid.server.compaction;
/**
* Used to track statistics for segments in different states of compaction.
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
similarity index 70%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
rename to server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index fa053fb8d6a..9e9a199917a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -17,20 +17,22 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
+package org.apache.druid.server.compaction;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -44,11 +46,16 @@ import java.util.List;
import java.util.function.Function;
/**
- * Represents the status of compaction for a given list of candidate segments.
+ * Represents the status of compaction for a given {@link CompactionCandidate}.
*/
public class CompactionStatus
{
- private static final CompactionStatus COMPLETE = new CompactionStatus(true, null);
+ private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null);
+
+ public enum State
+ {
+ COMPLETE, PENDING, RUNNING, SKIPPED
+ }
/**
* List of checks performed to determine if compaction is already complete.
@@ -68,54 +75,133 @@ public class CompactionStatus
Evaluator::transformSpecFilterIsUpToDate
);
- private final boolean complete;
- private final String reasonToCompact;
+ private final State state;
+ private final String reason;
- private CompactionStatus(boolean complete, String reason)
+ private CompactionStatus(State state, String reason)
{
- this.complete = complete;
- this.reasonToCompact = reason;
+ this.state = state;
+ this.reason = reason;
}
public boolean isComplete()
{
- return complete;
+ return state == State.COMPLETE;
}
- public String getReasonToCompact()
+ public boolean isSkipped()
{
- return reasonToCompact;
+ return state == State.SKIPPED;
+ }
+
+ public String getReason()
+ {
+ return reason;
+ }
+
+ public State getState()
+ {
+ return state;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompactionStatus{" +
+ "state=" + state +
+ ", reason=" + reason +
+ '}';
}
private static CompactionStatus incomplete(String reasonFormat, Object... args)
{
- return new CompactionStatus(false, StringUtils.format(reasonFormat, args));
+ return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args));
}
- private static CompactionStatus completeIfEqual(String field, Object configured, Object current)
+ private static CompactionStatus completeIfEqual(
+ String field,
+ T configured,
+ T current,
+ Function stringFunction
+ )
{
if (configured == null || configured.equals(current)) {
return COMPLETE;
} else {
- return configChanged(field, configured, current);
+ return configChanged(field, configured, current, stringFunction);
}
}
- private static CompactionStatus configChanged(String field, Object configured, Object current)
+ private static CompactionStatus configChanged(
+ String field,
+ T target,
+ T current,
+ Function stringFunction
+ )
{
return CompactionStatus.incomplete(
- "Configured %s[%s] is different from current %s[%s]",
- field, configured, field, current
+ "'%s' mismatch: required[%s], current[%s]",
+ field,
+ target == null ? null : stringFunction.apply(target),
+ current == null ? null : stringFunction.apply(current)
);
}
+ private static String asString(Granularity granularity)
+ {
+ if (granularity == null) {
+ return null;
+ }
+ for (GranularityType type : GranularityType.values()) {
+ if (type.getDefaultGranularity().equals(granularity)) {
+ return type.toString();
+ }
+ }
+ return granularity.toString();
+ }
+
+ private static String asString(PartitionsSpec partitionsSpec)
+ {
+ if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
+ DimensionRangePartitionsSpec rangeSpec = (DimensionRangePartitionsSpec) partitionsSpec;
+ return StringUtils.format(
+ "'range' on %s with %,d rows",
+ rangeSpec.getPartitionDimensions(), rangeSpec.getTargetRowsPerSegment()
+ );
+ } else if (partitionsSpec instanceof HashedPartitionsSpec) {
+ HashedPartitionsSpec hashedSpec = (HashedPartitionsSpec) partitionsSpec;
+ return StringUtils.format(
+ "'hashed' on %s with %,d rows",
+ hashedSpec.getPartitionDimensions(), hashedSpec.getTargetRowsPerSegment()
+ );
+ } else if (partitionsSpec instanceof DynamicPartitionsSpec) {
+ DynamicPartitionsSpec dynamicSpec = (DynamicPartitionsSpec) partitionsSpec;
+ return StringUtils.format(
+ "'dynamic' with %,d rows",
+ dynamicSpec.getMaxRowsPerSegment()
+ );
+ } else {
+ return partitionsSpec.toString();
+ }
+ }
+
+ static CompactionStatus skipped(String reasonFormat, Object... args)
+ {
+ return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args));
+ }
+
+ static CompactionStatus running(String reasonForCompaction)
+ {
+ return new CompactionStatus(State.RUNNING, reasonForCompaction);
+ }
+
/**
* Determines the CompactionStatus of the given candidate segments by evaluating
* the {@link #CHECKS} one by one. If any check returns an incomplete status,
* further checks are not performed and the incomplete status is returned.
*/
- static CompactionStatus of(
- SegmentsToCompact candidateSegments,
+ static CompactionStatus compute(
+ CompactionCandidate candidateSegments,
DataSourceCompactionConfig config,
ObjectMapper objectMapper
)
@@ -149,23 +235,21 @@ public class CompactionStatus
{
private final ObjectMapper objectMapper;
private final DataSourceCompactionConfig compactionConfig;
- private final SegmentsToCompact candidateSegments;
+ private final CompactionCandidate candidateSegments;
private final CompactionState lastCompactionState;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
private final ClientCompactionTaskGranularitySpec existingGranularitySpec;
private final UserCompactionTaskGranularityConfig configuredGranularitySpec;
private Evaluator(
- SegmentsToCompact candidateSegments,
+ CompactionCandidate candidateSegments,
DataSourceCompactionConfig compactionConfig,
ObjectMapper objectMapper
)
{
- Preconditions.checkArgument(!candidateSegments.isEmpty(), "Empty candidates");
-
this.candidateSegments = candidateSegments;
this.objectMapper = objectMapper;
- this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
+ this.lastCompactionState = candidateSegments.getSegments().get(0).getLastCompactionState();
this.compactionConfig = compactionConfig;
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
@@ -182,7 +266,7 @@ public class CompactionStatus
private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
{
if (lastCompactionState == null) {
- return CompactionStatus.incomplete("Not compacted yet");
+ return CompactionStatus.incomplete("not compacted yet");
} else {
return COMPLETE;
}
@@ -196,7 +280,7 @@ public class CompactionStatus
if (allHaveSameCompactionState) {
return COMPLETE;
} else {
- return CompactionStatus.incomplete("Candidate segments have different last compaction states.");
+ return CompactionStatus.incomplete("segments have different last compaction states");
}
}
@@ -205,7 +289,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"partitionsSpec",
findPartitionsSpecFromConfig(tuningConfig),
- lastCompactionState.getPartitionsSpec()
+ lastCompactionState.getPartitionsSpec(),
+ CompactionStatus::asString
);
}
@@ -214,7 +299,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"indexSpec",
Configs.valueOrDefault(tuningConfig.getIndexSpec(), IndexSpec.DEFAULT),
- objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class)
+ objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class),
+ String::valueOf
);
}
@@ -239,15 +325,16 @@ public class CompactionStatus
);
if (needsCompaction) {
return CompactionStatus.incomplete(
- "Configured segmentGranularity[%s] does not align with segment intervals.",
- configuredSegmentGranularity
+ "segmentGranularity: segments do not align with target[%s]",
+ asString(configuredSegmentGranularity)
);
}
} else {
return CompactionStatus.configChanged(
"segmentGranularity",
configuredSegmentGranularity,
- existingSegmentGranularity
+ existingSegmentGranularity,
+ CompactionStatus::asString
);
}
@@ -262,7 +349,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"rollup",
configuredGranularitySpec.isRollup(),
- existingGranularitySpec == null ? null : existingGranularitySpec.isRollup()
+ existingGranularitySpec == null ? null : existingGranularitySpec.isRollup(),
+ String::valueOf
);
}
}
@@ -275,7 +363,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"queryGranularity",
configuredGranularitySpec.getQueryGranularity(),
- existingGranularitySpec == null ? null : existingGranularitySpec.getQueryGranularity()
+ existingGranularitySpec == null ? null : existingGranularitySpec.getQueryGranularity(),
+ CompactionStatus::asString
);
}
}
@@ -289,7 +378,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"dimensionsSpec",
compactionConfig.getDimensionsSpec().getDimensions(),
- existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions()
+ existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions(),
+ String::valueOf
);
}
}
@@ -309,8 +399,9 @@ public class CompactionStatus
if (existingMetricsSpec == null || !Arrays.deepEquals(configuredMetricsSpec, existingMetricsSpec)) {
return CompactionStatus.configChanged(
"metricsSpec",
- Arrays.toString(configuredMetricsSpec),
- Arrays.toString(existingMetricsSpec)
+ configuredMetricsSpec,
+ existingMetricsSpec,
+ Arrays::toString
);
} else {
return COMPLETE;
@@ -330,7 +421,8 @@ public class CompactionStatus
return CompactionStatus.completeIfEqual(
"transformSpec filter",
compactionConfig.getTransformSpec().getFilter(),
- existingTransformSpec == null ? null : existingTransformSpec.getFilter()
+ existingTransformSpec == null ? null : existingTransformSpec.getFilter(),
+ String::valueOf
);
}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
new file mode 100644
index 00000000000..ab7ddbbb91a
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks status of recently submitted compaction tasks. Can be used by a segment
+ * search policy to skip an interval if it has been recently compacted or if it
+ * keeps failing repeatedly.
+ */
+public class CompactionStatusTracker
+{
+ private static final Duration MAX_STATUS_RETAIN_DURATION = Duration.standardHours(12);
+
+ private final ObjectMapper objectMapper;
+ private final ConcurrentHashMap datasourceStatuses
+ = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap submittedTaskIdToSegments
+ = new ConcurrentHashMap<>();
+
+ @Inject
+ public CompactionStatusTracker(ObjectMapper objectMapper)
+ {
+ this.objectMapper = objectMapper;
+ }
+
+ public void stop()
+ {
+ datasourceStatuses.clear();
+ }
+
+ public void removeDatasource(String datasource)
+ {
+ datasourceStatuses.remove(datasource);
+ }
+
+ public CompactionTaskStatus getLatestTaskStatus(CompactionCandidate candidates)
+ {
+ return datasourceStatuses
+ .getOrDefault(candidates.getDataSource(), DatasourceStatus.EMPTY)
+ .intervalToTaskStatus
+ .get(candidates.getUmbrellaInterval());
+ }
+
+ /**
+ * Set of submitted compaction task IDs which have not been marked completed
+ * via {@link #onTaskFinished} yet.
+ */
+ public Set getSubmittedTaskIds()
+ {
+ return submittedTaskIdToSegments.keySet();
+ }
+
+ public CompactionStatus computeCompactionStatus(
+ CompactionCandidate candidate,
+ DataSourceCompactionConfig config,
+ CompactionCandidateSearchPolicy searchPolicy
+ )
+ {
+ final CompactionStatus compactionStatus = CompactionStatus.compute(candidate, config, objectMapper);
+ if (compactionStatus.isComplete()) {
+ return compactionStatus;
+ }
+
+ // Skip intervals that violate max allowed input segment size
+ final long inputSegmentSize = config.getInputSegmentSizeBytes();
+ if (candidate.getTotalBytes() > inputSegmentSize) {
+ return CompactionStatus.skipped(
+ "'inputSegmentSize' exceeded: Total segment size[%d] is larger than allowed inputSegmentSize[%d]",
+ candidate.getTotalBytes(), inputSegmentSize
+ );
+ }
+
+ // Skip intervals that already have a running task
+ final CompactionTaskStatus lastTaskStatus = getLatestTaskStatus(candidate);
+ if (lastTaskStatus != null && lastTaskStatus.getState() == TaskState.RUNNING) {
+ return CompactionStatus.skipped("Task for interval is already running");
+ }
+
+ // Skip intervals that have been filtered out by the policy
+ if (!searchPolicy.isEligibleForCompaction(candidate, compactionStatus, lastTaskStatus)) {
+ return CompactionStatus.skipped("Rejected by search policy");
+ }
+
+ return compactionStatus;
+ }
+
+ public void onCompactionStatusComputed(
+ CompactionCandidate candidateSegments,
+ DataSourceCompactionConfig config
+ )
+ {
+ // Nothing to do, used by simulator
+ }
+
+ public void onCompactionConfigUpdated(DruidCompactionConfig compactionConfig)
+ {
+ final Set compactionEnabledDatasources = new HashSet<>();
+ if (compactionConfig.getCompactionConfigs() != null) {
+ compactionConfig.getCompactionConfigs().forEach(config -> {
+ getOrComputeDatasourceStatus(config.getDataSource())
+ .cleanupStaleTaskStatuses();
+
+ compactionEnabledDatasources.add(config.getDataSource());
+ });
+ }
+
+ // Clean up state for datasources where compaction has been disabled
+ final Set allDatasources = new HashSet<>(datasourceStatuses.keySet());
+ allDatasources.forEach(datasource -> {
+ if (!compactionEnabledDatasources.contains(datasource)) {
+ datasourceStatuses.remove(datasource);
+ }
+ });
+ }
+
+ public void onTaskSubmitted(
+ ClientCompactionTaskQuery taskPayload,
+ CompactionCandidate candidateSegments
+ )
+ {
+ submittedTaskIdToSegments.put(taskPayload.getId(), candidateSegments);
+ getOrComputeDatasourceStatus(taskPayload.getDataSource())
+ .handleSubmittedTask(candidateSegments);
+ }
+
+ public void onTaskFinished(String taskId, TaskStatus taskStatus)
+ {
+ if (!taskStatus.isComplete()) {
+ return;
+ }
+
+ final CompactionCandidate candidateSegments = submittedTaskIdToSegments.remove(taskId);
+ if (candidateSegments == null) {
+ // Nothing to do since we don't know the corresponding datasource or interval
+ return;
+ }
+
+ final Interval compactionInterval = candidateSegments.getUmbrellaInterval();
+ getOrComputeDatasourceStatus(candidateSegments.getDataSource())
+ .handleCompletedTask(compactionInterval, taskStatus);
+ }
+
+ private DatasourceStatus getOrComputeDatasourceStatus(String datasource)
+ {
+ return datasourceStatuses.computeIfAbsent(datasource, ds -> new DatasourceStatus());
+ }
+
+ /**
+ * Contains compaction task status of intervals of a datasource.
+ */
+ private static class DatasourceStatus
+ {
+ static final DatasourceStatus EMPTY = new DatasourceStatus();
+
+ final ConcurrentHashMap intervalToTaskStatus
+ = new ConcurrentHashMap<>();
+
+ void handleCompletedTask(Interval compactionInterval, TaskStatus taskStatus)
+ {
+ final CompactionTaskStatus lastKnownStatus = intervalToTaskStatus.get(compactionInterval);
+ final DateTime now = DateTimes.nowUtc();
+
+ final CompactionTaskStatus updatedStatus;
+ if (taskStatus.isSuccess()) {
+ updatedStatus = new CompactionTaskStatus(TaskState.SUCCESS, now, 0);
+ } else if (lastKnownStatus == null || lastKnownStatus.getState().isSuccess()) {
+ // This is the first failure
+ updatedStatus = new CompactionTaskStatus(TaskState.FAILED, now, 1);
+ } else {
+ updatedStatus = new CompactionTaskStatus(
+ TaskState.FAILED,
+ now,
+ lastKnownStatus.getNumConsecutiveFailures() + 1
+ );
+ }
+ intervalToTaskStatus.put(compactionInterval, updatedStatus);
+ }
+
+ void handleSubmittedTask(CompactionCandidate candidateSegments)
+ {
+ final Interval interval = candidateSegments.getUmbrellaInterval();
+ final CompactionTaskStatus lastStatus = intervalToTaskStatus.get(interval);
+
+ final DateTime now = DateTimes.nowUtc();
+ if (lastStatus == null || !lastStatus.getState().isFailure()) {
+ intervalToTaskStatus.put(interval, new CompactionTaskStatus(TaskState.RUNNING, now, 0));
+ } else {
+ intervalToTaskStatus.put(
+ interval,
+ new CompactionTaskStatus(TaskState.RUNNING, now, lastStatus.getNumConsecutiveFailures())
+ );
+ }
+ }
+
+ void cleanupStaleTaskStatuses()
+ {
+ final DateTime now = DateTimes.nowUtc();
+
+ final Set staleIntervals = new HashSet<>();
+ intervalToTaskStatus.forEach((interval, taskStatus) -> {
+ if (taskStatus.getUpdatedTime().plus(MAX_STATUS_RETAIN_DURATION).isBefore(now)) {
+ staleIntervals.add(interval);
+ }
+ });
+
+ staleIntervals.forEach(intervalToTaskStatus::remove);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java
new file mode 100644
index 00000000000..3431de706f6
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.indexer.TaskState;
+import org.joda.time.DateTime;
+
+public class CompactionTaskStatus
+{
+ private final TaskState state;
+ private final DateTime updatedTime;
+ private final int numConsecutiveFailures;
+
+ public CompactionTaskStatus(
+ TaskState state,
+ DateTime updatedTime,
+ int numConsecutiveFailures
+ )
+ {
+ this.state = state;
+ this.updatedTime = updatedTime;
+ this.numConsecutiveFailures = numConsecutiveFailures;
+ }
+
+ public TaskState getState()
+ {
+ return state;
+ }
+
+ public DateTime getUpdatedTime()
+ {
+ return updatedTime;
+ }
+
+ public int getNumConsecutiveFailures()
+ {
+ return numConsecutiveFailures;
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java b/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
similarity index 73%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java
rename to server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
index c086be3112b..0abbd00e975 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
+package org.apache.druid.server.compaction;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -37,6 +36,7 @@ import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.utils.CollectionUtils;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -59,67 +59,86 @@ import java.util.stream.Collectors;
/**
* Iterator over compactible segments of a datasource in order of specified priority.
*/
-public class DataSourceCompactibleSegmentIterator implements Iterator
+public class DataSourceCompactibleSegmentIterator implements CompactionSegmentIterator
{
private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class);
private final String dataSource;
- private final ObjectMapper objectMapper;
private final DataSourceCompactionConfig config;
- private final CompactionStatistics compactedSegmentStats = new CompactionStatistics();
- private final CompactionStatistics skippedSegmentStats = new CompactionStatistics();
+ private final CompactionStatusTracker statusTracker;
+ private final CompactionCandidateSearchPolicy searchPolicy;
+
+ private final List compactedSegments = new ArrayList<>();
+ private final List skippedSegments = new ArrayList<>();
// This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
// run of the compaction job and skip any interval that was already previously compacted.
- private final Set compactedIntervals = new HashSet<>();
+ private final Set queuedIntervals = new HashSet<>();
- private final PriorityQueue queue;
+ private final PriorityQueue queue;
public DataSourceCompactibleSegmentIterator(
DataSourceCompactionConfig config,
SegmentTimeline timeline,
List skipIntervals,
- Comparator segmentPriority,
- ObjectMapper objectMapper
+ CompactionCandidateSearchPolicy searchPolicy,
+ CompactionStatusTracker statusTracker
)
{
- this.objectMapper = objectMapper;
+ this.statusTracker = statusTracker;
this.config = config;
this.dataSource = config.getDataSource();
- this.queue = new PriorityQueue<>(segmentPriority);
+ this.searchPolicy = searchPolicy;
+ this.queue = new PriorityQueue<>(searchPolicy);
+
populateQueue(timeline, skipIntervals);
}
private void populateQueue(SegmentTimeline timeline, List skipIntervals)
{
if (timeline != null) {
- Granularity configuredSegmentGranularity = null;
if (!timeline.isEmpty()) {
SegmentTimeline originalTimeline = null;
- if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
- String temporaryVersion = DateTimes.nowUtc().toString();
- Map> intervalToPartitionMap = new HashMap<>();
- configuredSegmentGranularity = config.getGranularitySpec().getSegmentGranularity();
- // Create a new timeline to hold segments in the new configured segment granularity
- SegmentTimeline timelineWithConfiguredSegmentGranularity = new SegmentTimeline();
- Set segments = timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
+ if (config.getSegmentGranularity() != null) {
+ final Set segments = timeline.findNonOvershadowedObjectsInInterval(
+ Intervals.ETERNITY,
+ Partitions.ONLY_COMPLETE
+ );
+
+ // Skip compaction if any segment has partial-eternity interval
+ // See https://github.com/apache/druid/issues/13208
+ final List partialEternitySegments = new ArrayList<>();
for (DataSegment segment : segments) {
- // Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity
- // For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity
- // and the configuredSegmentGranularity is MONTH, the segment will be split to two segments
- // of 2020-01/2020-02 and 2020-02/2020-03.
if (Intervals.ETERNITY.getStart().equals(segment.getInterval().getStart())
|| Intervals.ETERNITY.getEnd().equals(segment.getInterval().getEnd())) {
- // This is to prevent the coordinator from crashing as raised in https://github.com/apache/druid/issues/13208
- log.warn("Cannot compact datasource[%s] containing segments with partial-ETERNITY intervals", dataSource);
- return;
- }
- for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) {
- intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment);
+ partialEternitySegments.add(segment);
}
}
+ if (!partialEternitySegments.isEmpty()) {
+ CompactionCandidate candidatesWithStatus = CompactionCandidate.from(partialEternitySegments).withCurrentStatus(
+ CompactionStatus.skipped("Segments have partial-eternity intervals")
+ );
+ skippedSegments.add(candidatesWithStatus);
+ statusTracker.onCompactionStatusComputed(candidatesWithStatus, config);
+ return;
+ }
+
+ // Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity
+ // For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity
+ // and the configuredSegmentGranularity is MONTH, the segment will be split to two segments
+ // of 2020-01/2020-02 and 2020-02/2020-03.
+ final SegmentTimeline timelineWithConfiguredSegmentGranularity = new SegmentTimeline();
+ final Map> intervalToPartitionMap = new HashMap<>();
+ for (DataSegment segment : segments) {
+ for (Interval interval : config.getSegmentGranularity().getIterable(segment.getInterval())) {
+ intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>())
+ .add(segment);
+ }
+ }
+
+ final String temporaryVersion = DateTimes.nowUtc().toString();
for (Map.Entry> partitionsPerInterval : intervalToPartitionMap.entrySet()) {
Interval interval = partitionsPerInterval.getKey();
int partitionNum = 0;
@@ -149,13 +168,7 @@ public class DataSourceCompactibleSegmentIterator implements Iterator searchIntervals = findInitialSearchInterval(
- dataSource,
- timeline,
- config.getSkipOffsetFromLatest(),
- configuredSegmentGranularity,
- skipIntervals
- );
+ final List searchIntervals = findInitialSearchInterval(timeline, skipIntervals);
if (!searchIntervals.isEmpty()) {
findAndEnqueueSegmentsToCompact(
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
@@ -167,14 +180,16 @@ public class DataSourceCompactibleSegmentIterator implements Iterator getCompactedSegments()
{
- return compactedSegmentStats;
+ return compactedSegments;
}
- public CompactionStatistics totalSkippedStatistics()
+ @Override
+ public List getSkippedSegments()
{
- return skippedSegmentStats;
+ return skippedSegments;
}
@Override
@@ -184,21 +199,13 @@ public class DataSourceCompactibleSegmentIterator implements Iterator resultSegments = entry.getSegments();
- Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
-
- return entry;
}
/**
@@ -296,96 +303,89 @@ public class DataSourceCompactibleSegmentIterator implements Iterator segments = compactibleSegmentIterator.next();
+ if (CollectionUtils.isNullOrEmpty(segments)) {
+ continue;
+ }
- // Do not compact an interval which comprises of a single tombstone
+ // Do not compact an interval which contains a single tombstone
// If there are multiple tombstones in the interval, we may still want to compact them
if (segments.size() == 1 && segments.get(0).isTombstone()) {
continue;
}
- final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
- final Interval interval = candidates.getUmbrellaInterval();
-
- final CompactionStatus compactionStatus = CompactionStatus.of(candidates, config, objectMapper);
- if (!compactionStatus.isComplete()) {
- log.debug(
- "Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
- dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
- );
- }
+ final CompactionCandidate candidates = CompactionCandidate.from(segments);
+ final CompactionStatus compactionStatus
+ = statusTracker.computeCompactionStatus(candidates, config, searchPolicy);
+ final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus);
+ statusTracker.onCompactionStatusComputed(candidatesWithStatus, config);
if (compactionStatus.isComplete()) {
- compactedSegmentStats.increment(candidates.getStats());
- } else if (candidates.getTotalBytes() > inputSegmentSize) {
- skippedSegmentStats.increment(candidates.getStats());
- log.warn(
- "Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
- + " is larger than allowed inputSegmentSize[%d].",
- dataSource, interval, candidates.getTotalBytes(), inputSegmentSize
- );
- } else if (config.getGranularitySpec() != null
- && config.getGranularitySpec().getSegmentGranularity() != null) {
- if (compactedIntervals.contains(interval)) {
- // Skip these candidate segments as we have already compacted this interval
- } else {
- compactedIntervals.add(interval);
- queue.add(candidates);
- }
- } else {
- queue.add(candidates);
+ compactedSegments.add(candidatesWithStatus);
+ } else if (compactionStatus.isSkipped()) {
+ skippedSegments.add(candidatesWithStatus);
+ } else if (!queuedIntervals.contains(candidates.getUmbrellaInterval())) {
+ queue.add(candidatesWithStatus);
+ queuedIntervals.add(candidates.getUmbrellaInterval());
}
}
-
- log.debug("No more segments to compact for datasource[%s].", dataSource);
}
/**
* Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
- *
- * @param timeline timeline of a dataSource
- * @param skipIntervals intervals to skip
- *
- * @return found interval to search or null if it's not found
*/
private List findInitialSearchInterval(
- String dataSourceName,
SegmentTimeline timeline,
- Period skipOffset,
- Granularity configuredSegmentGranularity,
@Nullable List skipIntervals
)
{
+ final Period skipOffset = config.getSkipOffsetFromLatest();
Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty");
Preconditions.checkNotNull(skipOffset, "skipOffset");
final TimelineObjectHolder first = Preconditions.checkNotNull(timeline.first(), "first");
final TimelineObjectHolder last = Preconditions.checkNotNull(timeline.last(), "last");
- final List fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
+ final Interval latestSkipInterval = computeLatestSkipInterval(
+ config.getSegmentGranularity(),
last.getInterval().getEnd(),
- skipOffset,
- configuredSegmentGranularity,
- skipIntervals
+ skipOffset
);
+ final List allSkipIntervals
+ = sortAndAddSkipIntervalFromLatest(latestSkipInterval, skipIntervals);
// Collect stats for all skipped segments
- for (Interval skipInterval : fullSkipIntervals) {
+ for (Interval skipInterval : allSkipIntervals) {
final List segments = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
);
- skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats());
+ if (!CollectionUtils.isNullOrEmpty(segments)) {
+ final CompactionCandidate candidates = CompactionCandidate.from(segments);
+
+ final CompactionStatus reason;
+ if (candidates.getUmbrellaInterval().overlaps(latestSkipInterval)) {
+ reason = CompactionStatus.skipped("skip offset from latest[%s]", skipOffset);
+ } else {
+ reason = CompactionStatus.skipped("interval locked by another task");
+ }
+
+ final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(reason);
+ skippedSegments.add(candidatesWithStatus);
+ statusTracker.onCompactionStatusComputed(candidatesWithStatus, config);
+ }
}
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
- final List filteredInterval = filterSkipIntervals(totalInterval, fullSkipIntervals);
+ final List filteredInterval = filterSkipIntervals(totalInterval, allSkipIntervals);
final List searchIntervals = new ArrayList<>();
for (Interval lookupInterval : filteredInterval) {
if (Intervals.ETERNITY.getStart().equals(lookupInterval.getStart())
|| Intervals.ETERNITY.getEnd().equals(lookupInterval.getEnd())) {
- log.warn("Cannot compact datasource[%s] since interval[%s] coincides with ETERNITY.", dataSourceName, lookupInterval);
+ log.warn(
+ "Cannot compact datasource[%s] since interval[%s] coincides with ETERNITY.",
+ dataSource, lookupInterval
+ );
return Collections.emptyList();
}
final List segments = timeline
@@ -416,25 +416,30 @@ public class DataSourceCompactibleSegmentIterator implements Iterator sortAndAddSkipIntervalFromLatest(
- DateTime latest,
- Period skipOffset,
- Granularity configuredSegmentGranularity,
+ Interval skipFromLatest,
@Nullable List skipIntervals
)
{
final List nonNullSkipIntervals = skipIntervals == null
? new ArrayList<>(1)
: new ArrayList<>(skipIntervals.size());
- final Interval skipFromLatest;
- if (configuredSegmentGranularity != null) {
- DateTime skipFromLastest = new DateTime(latest, latest.getZone()).minus(skipOffset);
- DateTime skipOffsetBucketToSegmentGranularity = configuredSegmentGranularity.bucketStart(skipFromLastest);
- skipFromLatest = new Interval(skipOffsetBucketToSegmentGranularity, latest);
- } else {
- skipFromLatest = new Interval(skipOffset, latest);
- }
if (skipIntervals != null) {
final List sortedSkipIntervals = new ArrayList<>(skipIntervals);
diff --git a/server/src/main/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicy.java
new file mode 100644
index 00000000000..7c440900ba7
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.guava.Comparators;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+
+/**
+ * Implementation of {@link CompactionCandidateSearchPolicy} that prioritizes
+ * intervals which have the latest data.
+ */
+public class NewestSegmentFirstPolicy extends BaseCandidateSearchPolicy
+{
+ @JsonCreator
+ public NewestSegmentFirstPolicy(
+ @JsonProperty("priorityDatasource") @Nullable String priorityDatasource
+ )
+ {
+ super(priorityDatasource);
+ }
+
+ @Override
+ protected Comparator getSegmentComparator()
+ {
+ return (o1, o2) -> Comparators.intervalsByStartThenEnd()
+ .compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval());
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/compaction/PriorityBasedCompactionSegmentIterator.java
similarity index 70%
rename from server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java
rename to server/src/main/java/org/apache/druid/server/compaction/PriorityBasedCompactionSegmentIterator.java
index 33aea2a0451..a64c3b1e641 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java
+++ b/server/src/main/java/org/apache/druid/server/compaction/PriorityBasedCompactionSegmentIterator.java
@@ -17,45 +17,42 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.compact;
+package org.apache.druid.server.compaction;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
-import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
+import java.util.stream.Collectors;
/**
- * Implementation of {@link CompactionSegmentIterator} that returns segments in
- * order of their priority.
+ * Implementation of {@link CompactionSegmentIterator} that returns candidate
+ * segments in order of their priority.
*/
public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator
{
private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class);
- private final PriorityQueue queue;
+ private final PriorityQueue queue;
private final Map datasourceIterators;
public PriorityBasedCompactionSegmentIterator(
+ CompactionCandidateSearchPolicy searchPolicy,
Map compactionConfigs,
Map datasourceToTimeline,
Map> skipIntervals,
- Comparator segmentPriority,
- ObjectMapper objectMapper
+ CompactionStatusTracker statusTracker
)
{
- this.queue = new PriorityQueue<>(segmentPriority);
+ this.queue = new PriorityQueue<>(searchPolicy);
this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
compactionConfigs.forEach((datasource, config) -> {
if (config == null) {
@@ -73,8 +70,8 @@ public class PriorityBasedCompactionSegmentIterator implements CompactionSegment
compactionConfigs.get(datasource),
timeline,
skipIntervals.getOrDefault(datasource, Collections.emptyList()),
- segmentPriority,
- objectMapper
+ searchPolicy,
+ statusTracker
)
);
addNextItemForDatasourceToQueue(datasource);
@@ -82,21 +79,19 @@ public class PriorityBasedCompactionSegmentIterator implements CompactionSegment
}
@Override
- public Map totalCompactedStatistics()
+ public List getCompactedSegments()
{
- return CollectionUtils.mapValues(
- datasourceIterators,
- DataSourceCompactibleSegmentIterator::totalCompactedStatistics
- );
+ return datasourceIterators.values().stream().flatMap(
+ iterator -> iterator.getCompactedSegments().stream()
+ ).collect(Collectors.toList());
}
@Override
- public Map totalSkippedStatistics()
+ public List getSkippedSegments()
{
- return CollectionUtils.mapValues(
- datasourceIterators,
- DataSourceCompactibleSegmentIterator::totalSkippedStatistics
- );
+ return datasourceIterators.values().stream().flatMap(
+ iterator -> iterator.getSkippedSegments().stream()
+ ).collect(Collectors.toList());
}
@Override
@@ -106,19 +101,18 @@ public class PriorityBasedCompactionSegmentIterator implements CompactionSegment
}
@Override
- public SegmentsToCompact next()
+ public CompactionCandidate next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
- final SegmentsToCompact entry = queue.poll();
+ final CompactionCandidate entry = queue.poll();
if (entry == null) {
throw new NoSuchElementException();
}
- Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty");
- addNextItemForDatasourceToQueue(entry.getFirst().getDataSource());
+ addNextItemForDatasourceToQueue(entry.getDataSource());
return entry;
}
@@ -126,9 +120,9 @@ public class PriorityBasedCompactionSegmentIterator implements CompactionSegment
{
final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName);
if (iterator.hasNext()) {
- final SegmentsToCompact segmentsToCompact = iterator.next();
- if (!segmentsToCompact.isEmpty()) {
- queue.add(segmentsToCompact);
+ final CompactionCandidate compactionCandidate = iterator.next();
+ if (compactionCandidate != null) {
+ queue.add(compactionCandidate);
}
}
}
diff --git a/server/src/main/java/org/apache/druid/server/compaction/Table.java b/server/src/main/java/org/apache/druid/server/compaction/Table.java
new file mode 100644
index 00000000000..5a620dacc8d
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/Table.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A simple table POJO with any number of rows and specified column names.
+ * Used in {@link CompactionSimulateResult}.
+ */
+public class Table
+{
+ private final List columnNames;
+ private final List> rows = new ArrayList<>();
+
+ public static Table withColumnNames(String... columnNames)
+ {
+ return new Table(Arrays.asList(columnNames), null);
+ }
+
+ @JsonCreator
+ public Table(
+ @JsonProperty("columnNames") List columnNames,
+ @JsonProperty("rows") List> rows
+ )
+ {
+ this.columnNames = columnNames;
+ if (rows != null) {
+ this.rows.addAll(rows);
+ }
+ }
+
+ @JsonProperty
+ public List getColumnNames()
+ {
+ return columnNames;
+ }
+
+ @JsonProperty
+ public List> getRows()
+ {
+ return rows;
+ }
+
+ public void addRow(Object... values)
+ {
+ rows.add(Arrays.asList(values));
+ }
+
+ public boolean isEmpty()
+ {
+ return rows.isEmpty();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Table{" +
+ "columnNames=" + columnNames +
+ ", rows=" + rows +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
index d52d4e9eba0..d6fa4835b48 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
@@ -21,8 +21,9 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.server.coordinator.compact.CompactionStatistics;
+import org.apache.druid.server.compaction.CompactionStatistics;
import javax.validation.constraints.NotNull;
import java.util.Objects;
@@ -60,22 +61,22 @@ public class AutoCompactionSnapshot
public static Builder builder(String dataSource)
{
- return new Builder(dataSource, AutoCompactionScheduleStatus.RUNNING);
+ return new Builder(dataSource).withStatus(AutoCompactionScheduleStatus.RUNNING);
}
@JsonCreator
public AutoCompactionSnapshot(
- @JsonProperty @NotNull String dataSource,
- @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus,
- @JsonProperty long bytesAwaitingCompaction,
- @JsonProperty long bytesCompacted,
- @JsonProperty long bytesSkipped,
- @JsonProperty long segmentCountAwaitingCompaction,
- @JsonProperty long segmentCountCompacted,
- @JsonProperty long segmentCountSkipped,
- @JsonProperty long intervalCountAwaitingCompaction,
- @JsonProperty long intervalCountCompacted,
- @JsonProperty long intervalCountSkipped
+ @JsonProperty("dataSource") @NotNull String dataSource,
+ @JsonProperty("scheduleStatus") @NotNull AutoCompactionScheduleStatus scheduleStatus,
+ @JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction,
+ @JsonProperty("bytesCompacted") long bytesCompacted,
+ @JsonProperty("bytesSkipped") long bytesSkipped,
+ @JsonProperty("segmentCountAwaitingCompaction") long segmentCountAwaitingCompaction,
+ @JsonProperty("segmentCountCompacted") long segmentCountCompacted,
+ @JsonProperty("segmentCountSkipped") long segmentCountSkipped,
+ @JsonProperty("intervalCountAwaitingCompaction") long intervalCountAwaitingCompaction,
+ @JsonProperty("intervalCountCompacted") long intervalCountCompacted,
+ @JsonProperty("intervalCountSkipped") long intervalCountSkipped
)
{
this.dataSource = dataSource;
@@ -192,26 +193,26 @@ public class AutoCompactionSnapshot
public static class Builder
{
private final String dataSource;
- private final AutoCompactionScheduleStatus scheduleStatus;
+ private AutoCompactionScheduleStatus scheduleStatus;
private final CompactionStatistics compactedStats = new CompactionStatistics();
private final CompactionStatistics skippedStats = new CompactionStatistics();
private final CompactionStatistics waitingStats = new CompactionStatistics();
private Builder(
- @NotNull String dataSource,
- @NotNull AutoCompactionScheduleStatus scheduleStatus
+ @NotNull String dataSource
)
{
if (dataSource == null || dataSource.isEmpty()) {
throw new ISE("Invalid dataSource name");
}
- if (scheduleStatus == null) {
- throw new ISE("scheduleStatus cannot be null");
- }
-
this.dataSource = dataSource;
- this.scheduleStatus = scheduleStatus;
+ }
+
+ public Builder withStatus(AutoCompactionScheduleStatus status)
+ {
+ this.scheduleStatus = Preconditions.checkNotNull(status, "scheduleStatus cannot be null");
+ return this;
}
public void incrementWaitingStats(CompactionStatistics entry)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
index 6009dc12cf4..e2b98a32a92 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -37,18 +38,21 @@ public class ClusterCompactionConfig
private final Integer maxCompactionTaskSlots;
private final Boolean useAutoScaleSlots;
private final CompactionEngine engine;
+ private final CompactionCandidateSearchPolicy compactionPolicy;
@JsonCreator
public ClusterCompactionConfig(
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
- @JsonProperty("engine") @Nullable CompactionEngine engine
+ @JsonProperty("engine") @Nullable CompactionEngine engine,
+ @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
+ this.compactionPolicy = compactionPolicy;
this.engine = engine;
}
@@ -80,6 +84,13 @@ public class ClusterCompactionConfig
return engine;
}
+ @Nullable
+ @JsonProperty
+ public CompactionCandidateSearchPolicy getCompactionPolicy()
+ {
+ return compactionPolicy;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -93,12 +104,19 @@ public class ClusterCompactionConfig
return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio)
&& Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots)
&& Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots)
+ && Objects.equals(compactionPolicy, that.compactionPolicy)
&& engine == that.engine;
}
@Override
public int hashCode()
{
- return Objects.hash(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, engine);
+ return Objects.hash(
+ compactionTaskSlotRatio,
+ maxCompactionTaskSlots,
+ useAutoScaleSlots,
+ compactionPolicy,
+ engine
+ );
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java
new file mode 100644
index 00000000000..e738a3e7f0b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * This config must be bound on CliOverlord to enable running compaction supervisors
+ * on the Overlord. When compaction supervisors are enabled, the Coordinator
+ * does not run auto-compact duty.
+ */
+public class CompactionSupervisorConfig
+{
+ private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null);
+
+ @JsonProperty
+ private final boolean enabled;
+
+ public static CompactionSupervisorConfig defaultConfig()
+ {
+ return DEFAULT;
+ }
+
+ @JsonCreator
+ public CompactionSupervisorConfig(
+ @JsonProperty("enabled") @Nullable Boolean enabled
+ )
+ {
+ this.enabled = Configs.valueOrDefault(enabled, false);
+ }
+
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompactionSupervisorConfig that = (CompactionSupervisorConfig) o;
+ return enabled == that.enabled;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(enabled);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompactionSchedulerConfig{" +
+ "enabled=" + enabled +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 193b53a5d05..b4b2be78032 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -20,9 +20,11 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Period;
@@ -188,6 +190,13 @@ public class DataSourceCompactionConfig
return engine;
}
+ @Nullable
+ @JsonIgnore
+ public Granularity getSegmentGranularity()
+ {
+ return granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
index 7793b55c4b9..b35ba7d2938 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
+import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -38,14 +40,17 @@ public class DruidCompactionConfig
{
public static final String CONFIG_KEY = "coordinator.compaction.config";
+ private static final CompactionCandidateSearchPolicy DEFAULT_COMPACTION_POLICY
+ = new NewestSegmentFirstPolicy(null);
private static final DruidCompactionConfig EMPTY_INSTANCE
- = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null);
+ = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null, null);
private final List compactionConfigs;
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
private final CompactionEngine engine;
+ private final CompactionCandidateSearchPolicy compactionPolicy;
public DruidCompactionConfig withDatasourceConfigs(
List compactionConfigs
@@ -56,7 +61,8 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
- engine
+ engine,
+ compactionPolicy
);
}
@@ -69,7 +75,8 @@ public class DruidCompactionConfig
Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio),
Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots),
Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots),
- Configs.valueOrDefault(update.getEngine(), engine)
+ Configs.valueOrDefault(update.getEngine(), engine),
+ Configs.valueOrDefault(update.getCompactionPolicy(), compactionPolicy)
);
}
@@ -91,7 +98,8 @@ public class DruidCompactionConfig
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
- @JsonProperty("engine") @Nullable CompactionEngine compactionEngine
+ @JsonProperty("engine") @Nullable CompactionEngine compactionEngine,
+ @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy
)
{
this.compactionConfigs = Configs.valueOrDefault(compactionConfigs, Collections.emptyList());
@@ -99,6 +107,7 @@ public class DruidCompactionConfig
this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE);
this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false);
this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE);
+ this.compactionPolicy = Configs.valueOrDefault(compactionPolicy, DEFAULT_COMPACTION_POLICY);
}
@JsonProperty
@@ -139,7 +148,8 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
- engine
+ engine,
+ compactionPolicy
);
}
@@ -160,6 +170,12 @@ public class DruidCompactionConfig
return Optional.absent();
}
+ @JsonProperty
+ public CompactionCandidateSearchPolicy getCompactionPolicy()
+ {
+ return compactionPolicy;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -174,6 +190,7 @@ public class DruidCompactionConfig
maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
useAutoScaleSlots == that.useAutoScaleSlots &&
engine == that.engine &&
+ Objects.equals(compactionPolicy, that.compactionPolicy) &&
Objects.equals(compactionConfigs, that.compactionConfigs);
}
@@ -185,7 +202,8 @@ public class DruidCompactionConfig
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
- engine
+ engine,
+ compactionPolicy
);
}
@@ -198,6 +216,7 @@ public class DruidCompactionConfig
", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
", useAutoScaleSlots=" + useAutoScaleSlots +
", engine=" + engine +
+ ", compactionPolicy=" + compactionPolicy +
'}';
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 5e468182fa7..6f907354f08 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
@@ -54,8 +55,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.compaction.CompactionRunSimulator;
+import org.apache.druid.server.compaction.CompactionSimulateResult;
+import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
-import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
@@ -153,12 +156,12 @@ public class DruidCoordinator
private final BalancerStrategyFactory balancerStrategyFactory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
+ private final CompactionStatusTracker compactionStatusTracker;
private final CompactSegments compactSegments;
@Nullable
private final CoordinatorSegmentMetadataCache coordinatorSegmentMetadataCache;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
-
private volatile boolean started = false;
/**
@@ -200,9 +203,9 @@ public class DruidCoordinator
CoordinatorCustomDutyGroups customDutyGroups,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector,
- CompactionSegmentSearchPolicy compactionSegmentSearchPolicy,
@Nullable CoordinatorSegmentMetadataCache coordinatorSegmentMetadataCache,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+ CompactionStatusTracker compactionStatusTracker
)
{
this.config = config;
@@ -220,7 +223,8 @@ public class DruidCoordinator
this.balancerStrategyFactory = config.getBalancerStrategyFactory();
this.lookupCoordinatorManager = lookupCoordinatorManager;
this.coordLeaderSelector = coordLeaderSelector;
- this.compactSegments = initializeCompactSegmentsDuty(compactionSegmentSearchPolicy);
+ this.compactionStatusTracker = compactionStatusTracker;
+ this.compactSegments = initializeCompactSegmentsDuty(this.compactionStatusTracker);
this.loadQueueManager = loadQueueManager;
this.coordinatorSegmentMetadataCache = coordinatorSegmentMetadataCache;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
@@ -341,12 +345,6 @@ public class DruidCoordinator
return replicaCountsInCluster == null ? null : replicaCountsInCluster.required();
}
- @Nullable
- public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
- {
- return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
- }
-
@Nullable
public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource)
{
@@ -358,6 +356,16 @@ public class DruidCoordinator
return compactSegments.getAutoCompactionSnapshot();
}
+ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest)
+ {
+ return new CompactionRunSimulator(compactionStatusTracker, overlordClient).simulateRunWithConfig(
+ metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest),
+ metadataManager.segments()
+ .getSnapshotOfDataSourcesWithAllUsedSegments()
+ .getUsedSegmentsTimelinesPerDataSource()
+ );
+ }
+
public String getCurrentLeader()
{
return coordLeaderSelector.getCurrentLeader();
@@ -533,6 +541,7 @@ public class DruidCoordinator
if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.onLeaderStop();
}
+ compactionStatusTracker.stop();
taskMaster.onLeaderStop();
serviceAnnouncer.unannounce(self);
lookupCoordinatorManager.stop();
@@ -541,6 +550,20 @@ public class DruidCoordinator
}
}
+ /**
+ * Check if compaction supervisors are enabled on the Overlord.
+ */
+ private boolean isCompactionSupervisorEnabled()
+ {
+ try {
+ return FutureUtils.getUnchecked(overlordClient.isCompactionSupervisorEnabled(), true);
+ }
+ catch (Exception e) {
+ // The Overlord is probably on an older version, assume that compaction supervisor is not enabled
+ return false;
+ }
+ }
+
@GuardedBy("lock")
private ScheduledExecutorService getOrCreateDutyGroupExecutor(String dutyGroup)
{
@@ -590,8 +613,7 @@ public class DruidCoordinator
duties.add(new KillStalePendingSegments(overlordClient));
}
- // CompactSegmentsDuty should be the last duty as it can take a long time to complete
- // We do not have to add compactSegments if it is already enabled in the custom duty group
+ // Do not add compactSegments if it is already included in the custom duty groups
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
duties.add(compactSegments);
}
@@ -625,11 +647,11 @@ public class DruidCoordinator
}
@VisibleForTesting
- CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy compactionSegmentSearchPolicy)
+ CompactSegments initializeCompactSegmentsDuty(CompactionStatusTracker statusTracker)
{
List compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
- return new CompactSegments(compactionSegmentSearchPolicy, overlordClient);
+ return new CompactSegments(statusTracker, overlordClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn(
@@ -735,6 +757,10 @@ public class DruidCoordinator
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
+ if (shouldSkipAutoCompactDuty(duty)) {
+ continue;
+ }
+
dutyRunTime.restart();
params = duty.run(params);
dutyRunTime.stop();
@@ -781,6 +807,26 @@ public class DruidCoordinator
}
}
+ /**
+ * @return true if this is an auto-compact CompactSegments duty and should
+ * not be run in case Compaction Scheduler is already running on Overlord.
+ * Manually triggered compaction should always be run.
+ */
+ private boolean shouldSkipAutoCompactDuty(CoordinatorDuty duty)
+ {
+ final boolean shouldSkipDuty = duty instanceof CompactSegments
+ && !COMPACT_SEGMENTS_DUTIES_DUTY_GROUP.equals(dutyGroupName)
+ && isCompactionSupervisorEnabled();
+ if (shouldSkipDuty) {
+ log.warn(
+ "Skipping Compact Segments duty in group[%s] since compaction"
+ + " supervisors are already running on Overlord.",
+ dutyGroupName
+ );
+ }
+ return shouldSkipDuty;
+ }
+
private void emitStat(CoordinatorStat stat, Map dimensionValues, long value)
{
ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder()
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
deleted file mode 100644
index bab7ca8f92f..00000000000
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.compact;
-
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
- * (see {@link DataSegment#compareTo}).
- */
-public interface CompactionSegmentIterator extends Iterator
-{
- /**
- * Return a map of dataSourceName to CompactionStatistics.
- * This method returns the aggregated statistics of segments that was already compacted and does not need to be compacted
- * again. Hence, segment that were not returned by the {@link Iterator#next()} becuase it does not needs compaction.
- * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
- */
- Map totalCompactedStatistics();
-
- /**
- * Return a map of dataSourceName to CompactionStatistics.
- * This method returns the aggregated statistics of segments that was skipped as it cannot be compacted.
- * Hence, segment that were not returned by the {@link Iterator#next()} becuase it cannot be compacted.
- * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
- */
- Map totalSkippedStatistics();
-
-}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
deleted file mode 100644
index bc923da4f80..00000000000
--- a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.compact;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This policy searches segments for compaction from newest to oldest.
- */
-public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
-{
- private final ObjectMapper objectMapper;
-
- @Inject
- public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
- {
- this.objectMapper = objectMapper;
- }
-
- @Override
- public CompactionSegmentIterator createIterator(
- Map compactionConfigs,
- Map dataSources,
- Map> skipIntervals
- )
- {
- return new PriorityBasedCompactionSegmentIterator(
- compactionConfigs,
- dataSources,
- skipIntervals,
- (o1, o2) -> Comparators.intervalsByStartThenEnd()
- .compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
- objectMapper
- );
- }
-
-}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 7b2392b8c66..a2f97f298af 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
@@ -34,11 +33,11 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.ClientMSQContext;
-import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.ISE;
@@ -48,13 +47,15 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.compaction.CompactionCandidate;
+import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
+import org.apache.druid.server.compaction.CompactionSegmentIterator;
+import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
-import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
-import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@@ -67,8 +68,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -90,22 +93,21 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Predicate IS_COMPACTION_TASK =
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
- private final CompactionSegmentSearchPolicy policy;
+ private final CompactionStatusTracker statusTracker;
private final OverlordClient overlordClient;
// This variable is updated by the Coordinator thread executing duties and
// read by HTTP threads processing Coordinator API calls.
private final AtomicReference> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
- @Inject
@JsonCreator
public CompactSegments(
- @JacksonInject CompactionSegmentSearchPolicy policy,
+ @JacksonInject CompactionStatusTracker statusTracker,
@JacksonInject OverlordClient overlordClient
)
{
- this.policy = policy;
this.overlordClient = overlordClient;
+ this.statusTracker = statusTracker;
resetCompactionSnapshot();
}
@@ -117,22 +119,36 @@ public class CompactSegments implements CoordinatorCustomDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
+ {
+ run(
+ params.getCompactionConfig(),
+ params.getUsedSegmentsTimelinesPerDataSource(),
+ params.getCoordinatorStats()
+ );
+ return params;
+ }
+
+ public void run(
+ DruidCompactionConfig dynamicConfig,
+ Map dataSources,
+ CoordinatorRunStats stats
+ )
{
LOG.info("Running CompactSegments duty");
- final DruidCompactionConfig dynamicConfig = params.getCompactionConfig();
final int maxCompactionTaskSlots = dynamicConfig.getMaxCompactionTaskSlots();
if (maxCompactionTaskSlots <= 0) {
LOG.info("Skipping compaction as maxCompactionTaskSlots is [%d].", maxCompactionTaskSlots);
resetCompactionSnapshot();
- return params;
+ return;
}
+ statusTracker.onCompactionConfigUpdated(dynamicConfig);
List compactionConfigList = dynamicConfig.getCompactionConfigs();
if (compactionConfigList == null || compactionConfigList.isEmpty()) {
LOG.info("Skipping compaction as compaction config list is empty.");
resetCompactionSnapshot();
- return params;
+ return;
}
Map compactionConfigs = compactionConfigList
@@ -144,10 +160,15 @@ public class CompactSegments implements CoordinatorCustomDuty
// Fetch currently running compaction tasks
int busyCompactionTaskSlots = 0;
- final List compactionTasks = CoordinatorDutyUtils.getNumActiveTaskSlots(
+ final List compactionTasks = CoordinatorDutyUtils.getStatusOfActiveTasks(
overlordClient,
IS_COMPACTION_TASK
);
+
+ final Set activeTaskIds
+ = compactionTasks.stream().map(TaskStatusPlus::getId).collect(Collectors.toSet());
+ trackStatusOfCompletedTasks(activeTaskIds);
+
for (TaskStatusPlus status : compactionTasks) {
final TaskPayloadResponse response =
FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()), true);
@@ -194,9 +215,14 @@ public class CompactSegments implements CoordinatorCustomDuty
);
// Get iterator over segments to compact and submit compaction tasks
- Map dataSources = params.getUsedSegmentsTimelinesPerDataSource();
- final CompactionSegmentIterator iterator =
- policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction);
+ final CompactionCandidateSearchPolicy policy = dynamicConfig.getCompactionPolicy();
+ final CompactionSegmentIterator iterator = new PriorityBasedCompactionSegmentIterator(
+ policy,
+ compactionConfigs,
+ dataSources,
+ intervalsToSkipCompaction,
+ statusTracker
+ );
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
final int availableCompactionTaskSlots
@@ -211,13 +237,10 @@ public class CompactSegments implements CoordinatorCustomDuty
dynamicConfig.getEngine()
);
- final CoordinatorRunStats stats = params.getCoordinatorStats();
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
-
- return params;
}
private void resetCompactionSnapshot()
@@ -225,6 +248,31 @@ public class CompactSegments implements CoordinatorCustomDuty
autoCompactionSnapshotPerDataSource.set(Collections.emptyMap());
}
+ /**
+ * Queries the Overlord for the status of all tasks that were submitted
+ * recently but are not active anymore. The statuses are then updated in the
+ * {@link #statusTracker}.
+ */
+ private void trackStatusOfCompletedTasks(Set activeTaskIds)
+ {
+ final Set finishedTaskIds = new HashSet<>(statusTracker.getSubmittedTaskIds());
+ finishedTaskIds.removeAll(activeTaskIds);
+
+ if (finishedTaskIds.isEmpty()) {
+ return;
+ }
+
+ final Map taskStatusMap
+ = FutureUtils.getUnchecked(overlordClient.taskStatuses(finishedTaskIds), true);
+ for (String taskId : finishedTaskIds) {
+ // Assume unknown task to have finished successfully
+ final TaskStatus taskStatus = taskStatusMap.getOrDefault(taskId, TaskStatus.success(taskId));
+ if (taskStatus.isComplete()) {
+ statusTracker.onTaskFinished(taskId, taskStatus);
+ }
+ }
+ }
+
/**
* Cancels a currently running compaction task if the segment granularity
* for this datasource has changed in the compaction config.
@@ -295,8 +343,7 @@ public class CompactSegments implements CoordinatorCustomDuty
* Returns the maximum number of task slots used by one native compaction task at any time when the task is
* issued with the given tuningConfig.
*/
- @VisibleForTesting
- static int findMaxNumTaskSlotsUsedByOneNativeCompactionTask(
+ public static int findMaxNumTaskSlotsUsedByOneNativeCompactionTask(
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig
)
{
@@ -392,12 +439,8 @@ public class CompactSegments implements CoordinatorCustomDuty
int totalTaskSlotsAssigned = 0;
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
- final SegmentsToCompact entry = iterator.next();
- if (entry.isEmpty()) {
- throw new ISE("segmentsToCompact is empty?");
- }
-
- final String dataSourceName = entry.getFirst().getDataSource();
+ final CompactionCandidate entry = iterator.next();
+ final String dataSourceName = entry.getDataSource();
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
currentRunAutoCompactionSnapshotBuilders
@@ -408,7 +451,6 @@ public class CompactSegments implements CoordinatorCustomDuty
final List segmentsToCompact = entry.getSegments();
// Create granularitySpec to send to compaction task
- ClientCompactionTaskGranularitySpec granularitySpec;
Granularity segmentGranularityToUse = null;
if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
// Determines segmentGranularity from the segmentsToCompact
@@ -433,14 +475,14 @@ public class CompactSegments implements CoordinatorCustomDuty
} else {
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
}
- granularitySpec = new ClientCompactionTaskGranularitySpec(
+ final ClientCompactionTaskGranularitySpec granularitySpec = new ClientCompactionTaskGranularitySpec(
segmentGranularityToUse,
config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null,
config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null
);
// Create dimensionsSpec to send to compaction task
- ClientCompactionTaskDimensionsSpec dimensionsSpec;
+ final ClientCompactionTaskDimensionsSpec dimensionsSpec;
if (config.getDimensionsSpec() != null) {
dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
config.getDimensionsSpec().getDimensions()
@@ -506,7 +548,7 @@ public class CompactSegments implements CoordinatorCustomDuty
}
final String taskId = compactSegments(
- segmentsToCompact,
+ entry,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
@@ -552,28 +594,25 @@ public class CompactSegments implements CoordinatorCustomDuty
{
// Mark all the segments remaining in the iterator as "awaiting compaction"
while (iterator.hasNext()) {
- final SegmentsToCompact entry = iterator.next();
- if (!entry.isEmpty()) {
- final String dataSourceName = entry.getFirst().getDataSource();
- currentRunAutoCompactionSnapshotBuilders
- .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
- .incrementWaitingStats(entry.getStats());
- }
+ final CompactionCandidate entry = iterator.next();
+ currentRunAutoCompactionSnapshotBuilders
+ .computeIfAbsent(entry.getDataSource(), AutoCompactionSnapshot::builder)
+ .incrementWaitingStats(entry.getStats());
}
// Statistics of all segments considered compacted after this run
- iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> {
- currentRunAutoCompactionSnapshotBuilders
- .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
- .incrementCompactedStats(compactedStats);
- });
+ iterator.getCompactedSegments().forEach(
+ candidateSegments -> currentRunAutoCompactionSnapshotBuilders
+ .computeIfAbsent(candidateSegments.getDataSource(), AutoCompactionSnapshot::builder)
+ .incrementCompactedStats(candidateSegments.getStats())
+ );
// Statistics of all segments considered skipped after this run
- iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> {
- currentRunAutoCompactionSnapshotBuilders
- .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
- .incrementSkippedStats(dataSourceSkippedStatistics);
- });
+ iterator.getSkippedSegments().forEach(
+ candidateSegments -> currentRunAutoCompactionSnapshotBuilders
+ .computeIfAbsent(candidateSegments.getDataSource(), AutoCompactionSnapshot::builder)
+ .incrementSkippedStats(candidateSegments.getStats())
+ );
final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> {
@@ -604,16 +643,6 @@ public class CompactSegments implements CoordinatorCustomDuty
stats.add(Stats.Compaction.SKIPPED_INTERVALS, rowKey, autoCompactionSnapshot.getIntervalCountSkipped());
}
- @Nullable
- public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
- {
- AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get().get(dataSource);
- if (autoCompactionSnapshot == null) {
- return null;
- }
- return autoCompactionSnapshot.getBytesAwaitingCompaction();
- }
-
@Nullable
public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource)
{
@@ -626,10 +655,10 @@ public class CompactSegments implements CoordinatorCustomDuty
}
private String compactSegments(
- List segments,
+ CompactionCandidate entry,
int compactionTaskPriority,
- @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
- @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ ClientCompactionTaskQueryTuningConfig tuningConfig,
+ ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@@ -638,6 +667,7 @@ public class CompactSegments implements CoordinatorCustomDuty
ClientCompactionRunnerInfo compactionRunner
)
{
+ final List segments = entry.getSegments();
Preconditions.checkArgument(!segments.isEmpty(), "Expect non-empty segments to compact");
final String dataSource = segments.get(0).getDataSource();
@@ -651,7 +681,7 @@ public class CompactSegments implements CoordinatorCustomDuty
final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null);
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
- final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
+ final ClientCompactionTaskQuery taskPayload = new ClientCompactionTaskQuery(
taskId,
dataSource,
new ClientCompactionIOConfig(
@@ -667,6 +697,8 @@ public class CompactSegments implements CoordinatorCustomDuty
compactionRunner
);
FutureUtils.getUnchecked(overlordClient.runTask(taskId, taskPayload), true);
+ statusTracker.onTaskSubmitted(taskPayload, entry);
+
return taskId;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
index f6f31173fa5..1c31a28087b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
@@ -86,17 +86,16 @@ public class CoordinatorDutyUtils
}
/**
- * Return the number of active tasks that match the task predicate provided. The number of active tasks returned
- * may be an overestimate, as tasks that return status's with null types will be conservatively counted to match the
- * predicate provided.
+ * Fetches active task statuses that match the given predicate.
+ * Task statuses with null types are considered to satisfy the predicate too.
*
* @param overlordClient The overlord client to use to retrieve the list of active tasks.
- * @param taskPredicate The predicate to match against the list of retreived task status.
+ * @param taskPredicate The predicate to match against the list of retrieved task statuses.
* This predicate will never be called with a null task status.
*
- * @return the number of active tasks that match the task predicate provided
+ * @return Active task statuses that match the given predicate.
*/
- public static List getNumActiveTaskSlots(
+ public static List getStatusOfActiveTasks(
@Nonnull final OverlordClient overlordClient,
final Predicate taskPredicate
)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 64b61df5e53..199c3555b37 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -306,7 +306,7 @@ public class KillUnusedSegments implements CoordinatorDuty
final int availableKillTaskSlots = Math.max(
0,
- killTaskCapacity - CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, IS_AUTO_KILL_TASK).size()
+ killTaskCapacity - CoordinatorDutyUtils.getStatusOfActiveTasks(overlordClient, IS_AUTO_KILL_TASK).size()
);
stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index 7bd4ee85124..8f9dfb9ca85 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -146,6 +146,7 @@ public class CoordinatorCompactionConfigsResource
compactionTaskSlotRatio,
maxCompactionTaskSlots,
useAutoScaleSlots,
+ null,
null
),
req
diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java
similarity index 53%
rename from server/src/main/java/org/apache/druid/server/http/CompactionResource.java
rename to server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java
index e88d0cdacfb..015a27435cf 100644
--- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java
@@ -22,13 +22,19 @@ package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
+import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -39,16 +45,19 @@ import javax.ws.rs.core.Response;
import java.util.Collection;
@Path("/druid/coordinator/v1/compaction")
-public class CompactionResource
+public class CoordinatorCompactionResource
{
private final DruidCoordinator coordinator;
+ private final OverlordClient overlordClient;
@Inject
- public CompactionResource(
- DruidCoordinator coordinator
+ public CoordinatorCompactionResource(
+ DruidCoordinator coordinator,
+ OverlordClient overlordClient
)
{
this.coordinator = coordinator;
+ this.overlordClient = overlordClient;
}
/**
@@ -72,11 +81,21 @@ public class CompactionResource
@QueryParam("dataSource") String dataSource
)
{
- final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
- if (notCompactedSegmentSizeBytes == null) {
- return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+ if (dataSource == null || dataSource.isEmpty()) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "No DataSource specified"))
+ .build();
+ }
+
+ if (isCompactionSupervisorEnabled()) {
+ return buildResponse(overlordClient.getBytesAwaitingCompaction(dataSource));
+ }
+
+ final AutoCompactionSnapshot snapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
+ if (snapshot == null) {
+ return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build();
} else {
- return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
+ return Response.ok(ImmutableMap.of("remainingSegmentSize", snapshot.getBytesAwaitingCompaction())).build();
}
}
@@ -88,16 +107,66 @@ public class CompactionResource
@QueryParam("dataSource") String dataSource
)
{
+ if (isCompactionSupervisorEnabled()) {
+ return buildResponse(overlordClient.getCompactionSnapshots(dataSource));
+ }
+
final Collection snapshots;
if (dataSource == null || dataSource.isEmpty()) {
snapshots = coordinator.getAutoCompactionSnapshot().values();
} else {
AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (autoCompactionSnapshot == null) {
- return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+ return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build();
}
snapshots = ImmutableList.of(autoCompactionSnapshot);
}
return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
}
+
+ @POST
+ @Path("/simulate")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response simulateWithClusterConfigUpdate(
+ ClusterCompactionConfig updatePayload
+ )
+ {
+ return Response.ok().entity(
+ coordinator.simulateRunWithConfigUpdate(updatePayload)
+ ).build();
+ }
+
+ private Response buildResponse(ListenableFuture future)
+ {
+ try {
+ return Response.ok(FutureUtils.getUnchecked(future, true)).build();
+ }
+ catch (Exception e) {
+ if (e.getCause() instanceof HttpResponseException) {
+ final HttpResponseException cause = (HttpResponseException) e.getCause();
+ return Response.status(cause.getResponse().getStatus().getCode())
+ .entity(cause.getResponse().getContent())
+ .build();
+ } else {
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ }
+ }
+
+ /**
+ * Check if compaction supervisors are enabled on the Overlord.
+ */
+ private boolean isCompactionSupervisorEnabled()
+ {
+ try {
+ return FutureUtils.getUnchecked(overlordClient.isCompactionSupervisorEnabled(), true);
+ }
+ catch (Exception e) {
+ // Overlord is probably on an older version, assume that compaction supervisor is not enabled
+ return false;
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
index 10ebeb53af2..c73ff1ca059 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -122,6 +123,24 @@ public class NoopOverlordClient implements OverlordClient
throw new UnsupportedOperationException();
}
+ @Override
+ public ListenableFuture> getCompactionSnapshots(@Nullable String dataSource)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture getBytesAwaitingCompaction(String dataSource)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListenableFuture isCompactionSupervisorEnabled()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 6eccbccaa84..f82cfbf2a04 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
@@ -174,7 +175,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(i, 0)
@@ -209,7 +210,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(10 + i, 0)
@@ -243,7 +244,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(20 + i, 0)
@@ -257,7 +258,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
}
derbyConnector.retryWithHandle(
- handle -> coordinator.insertPendingSegmentsIntoMetastore(handle, pendingSegmentsForTask, DS.WIKI, false)
+ handle -> coordinator.insertPendingSegmentsIntoMetastore(handle, pendingSegmentsForTask, TestDataSource.WIKI, false)
);
final Map segmentToReplaceLock
@@ -272,7 +273,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
Set allCommittedSegments
= new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
Map upgradedFromSegmentIdMap = coordinator.retrieveUpgradedFromSegmentIds(
- DS.WIKI,
+ TestDataSource.WIKI,
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
// Verify the segments present in the metadata store
@@ -901,7 +902,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final List intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
final Collection actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
- DS.WIKI,
+ TestDataSource.WIKI,
intervals,
Segments.ONLY_VISIBLE
);
@@ -920,7 +921,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));
final Collection actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
- DS.WIKI,
+ TestDataSource.WIKI,
ImmutableList.of(outOfRangeInterval),
Segments.ONLY_VISIBLE
);
@@ -934,7 +935,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final List segments = createAndGetUsedYearSegments(1900, 2133);
final Collection actualUsedSegments = coordinator.retrieveAllUsedSegments(
- DS.WIKI,
+ TestDataSource.WIKI,
Segments.ONLY_VISIBLE
);
@@ -949,7 +950,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("1900/3000"),
null,
null
@@ -967,7 +968,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final int requestedLimit = segments.size();
final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("1900/3000"),
requestedLimit,
null
@@ -985,7 +986,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final int requestedLimit = segments.size() - 1;
final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("1900/3000"),
requestedLimit,
null
@@ -1003,7 +1004,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final int limit = segments.size() + 1;
final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("1900/3000"),
limit,
null
@@ -1024,7 +1025,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final int limit = segments.size() + 1;
final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
outOfRangeInterval,
limit,
null
@@ -1540,7 +1541,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
for (DataSegment unusedSegment : unusedSegments) {
Assertions.assertThat(
coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-01-04"),
ImmutableList.of(unusedSegment.getVersion()),
null,
@@ -1551,7 +1552,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
Assertions.assertThat(
coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-01-04"),
ImmutableList.of(v1, v2),
null,
@@ -1561,7 +1562,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
Assertions.assertThat(
coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-01-04"),
null,
null,
@@ -1571,7 +1572,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
Assertions.assertThat(
coordinator.retrieveUnusedSegmentsForInterval(
- DS.WIKI,
+ TestDataSource.WIKI,
Intervals.of("2023-01-01/2023-01-04"),
ImmutableList.of("some-non-existent-version"),
null,
@@ -3235,7 +3236,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
- DS.WIKI,
+ TestDataSource.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
@@ -3260,7 +3261,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
final Collection allUsedSegments = coordinator.retrieveAllUsedSegments(
- DS.WIKI,
+ TestDataSource.WIKI,
Segments.ONLY_VISIBLE
);
@@ -3290,7 +3291,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
- DS.WIKI,
+ TestDataSource.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
@@ -3315,7 +3316,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
final Collection allUsedSegments = coordinator.retrieveAllUsedSegments(
- DS.WIKI,
+ TestDataSource.WIKI,
Segments.ONLY_VISIBLE
);
@@ -3329,7 +3330,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
public void testSegmentIdShouldNotBeReallocated()
{
final SegmentIdWithShardSpec idWithNullTaskAllocator = coordinator.allocatePendingSegment(
- DS.WIKI,
+ TestDataSource.WIKI,
"seq",
"0",
Intervals.ETERNITY,
@@ -3345,7 +3346,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
);
final SegmentIdWithShardSpec idWithValidTaskAllocator = coordinator.allocatePendingSegment(
- DS.WIKI,
+ TestDataSource.WIKI,
"seq",
"1",
Intervals.ETERNITY,
@@ -3363,12 +3364,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
// Insert pending segments
coordinator.commitSegments(ImmutableSet.of(dataSegment0, dataSegment1), null);
// Clean up pending segments corresponding to the valid task allocator id
- coordinator.deletePendingSegmentsForTaskAllocatorId(DS.WIKI, "taskAllocatorId");
+ coordinator.deletePendingSegmentsForTaskAllocatorId(TestDataSource.WIKI, "taskAllocatorId");
// Mark all segments as unused
- coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY);
+ coordinator.markSegmentsAsUnusedWithinInterval(TestDataSource.WIKI, Intervals.ETERNITY);
final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
- DS.WIKI,
+ TestDataSource.WIKI,
"seq",
"2",
Intervals.ETERNITY,
@@ -3406,7 +3407,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
),
null
);
- coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY);
+ coordinator.markSegmentsAsUnusedWithinInterval(TestDataSource.WIKI, Intervals.ETERNITY);
DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
Intervals.of("2024/2025"),
@@ -3417,7 +3418,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
List unusedSegmentIdsForIntervalAndVersion =
- coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1");
+ coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(TestDataSource.WIKI, Intervals.of("2024/2025"), "v1");
Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size());
Assert.assertEquals(
unusedSegmentForExactIntervalAndVersion.getId().toString(),
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
index 2076e5ffa46..cb0bea36c20 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.TestDataSource;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
@@ -310,15 +311,10 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
protected FingerprintGenerator fingerprintGenerator;
protected SegmentSchemaTestUtils segmentSchemaTestUtils;
- protected static class DS
- {
- static final String WIKI = "wiki";
- }
-
protected DataSegment createSegment(Interval interval, String version, ShardSpec shardSpec)
{
return DataSegment.builder()
- .dataSource(DS.WIKI)
+ .dataSource(TestDataSource.WIKI)
.interval(interval)
.version(version)
.shardSpec(shardSpec)
@@ -365,7 +361,7 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
tablesConfig,
mapper
)
- .retrieveUnusedSegments(DS.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
+ .retrieveUnusedSegments(TestDataSource.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
return ImmutableList.copyOf(iterator);
}
}
@@ -384,13 +380,8 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
return derbyConnector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator iterator =
- SqlSegmentsMetadataQuery.forHandle(
- handle,
- derbyConnector,
- tablesConfig,
- mapper
- )
- .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
+ SqlSegmentsMetadataQuery.forHandle(handle, derbyConnector, tablesConfig, mapper)
+ .retrieveUnusedSegmentsPlus(TestDataSource.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
index efeadbdc04e..82a87dd15a9 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataRuleManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.TestDataSource;
import org.apache.druid.server.audit.AuditSerdeHelper;
import org.apache.druid.server.audit.SQLAuditManager;
import org.apache.druid.server.audit.SQLAuditManagerConfig;
@@ -52,8 +53,6 @@ import java.util.Map;
public class SQLMetadataRuleManagerTest
{
- private static final String DATASOURCE = "wiki";
-
@org.junit.Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@@ -108,12 +107,12 @@ public class SQLMetadataRuleManagerTest
null
)
);
- ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("override rule"));
+ ruleManager.overrideRule(TestDataSource.WIKI, rules, createAuditInfo("override rule"));
// New rule should be be reflected in the in memory rules map immediately after being set by user
Map> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
- Assert.assertEquals(1, allRules.get(DATASOURCE).size());
- Assert.assertEquals(rules.get(0), allRules.get(DATASOURCE).get(0));
+ Assert.assertEquals(1, allRules.get(TestDataSource.WIKI).size());
+ Assert.assertEquals(rules.get(0), allRules.get(TestDataSource.WIKI).get(0));
}
@Test
@@ -122,7 +121,7 @@ public class SQLMetadataRuleManagerTest
// Datasource level rules cannot be null
IAE exception = Assert.assertThrows(
IAE.class,
- () -> ruleManager.overrideRule(DATASOURCE, null, createAuditInfo("null rule"))
+ () -> ruleManager.overrideRule(TestDataSource.WIKI, null, createAuditInfo("null rule"))
);
Assert.assertEquals("Rules cannot be null.", exception.getMessage());
@@ -155,7 +154,7 @@ public class SQLMetadataRuleManagerTest
// Datasource level rules can be empty
Assert.assertTrue(
ruleManager.overrideRule(
- DATASOURCE,
+ TestDataSource.WIKI,
Collections.emptyList(),
createAuditInfo("empty rule")
)
@@ -173,14 +172,14 @@ public class SQLMetadataRuleManagerTest
)
);
final AuditInfo auditInfo = createAuditInfo("create audit entry");
- ruleManager.overrideRule(DATASOURCE, rules, auditInfo);
+ ruleManager.overrideRule(TestDataSource.WIKI, rules, auditInfo);
// fetch rules from metadata storage
ruleManager.poll();
- Assert.assertEquals(rules, ruleManager.getRules(DATASOURCE));
+ Assert.assertEquals(rules, ruleManager.getRules(TestDataSource.WIKI));
// verify audit entry is created
- List auditEntries = auditManager.fetchAuditHistory(DATASOURCE, "rules", null);
+ List auditEntries = auditManager.fetchAuditHistory(TestDataSource.WIKI, "rules", null);
Assert.assertEquals(1, auditEntries.size());
AuditEntry entry = auditEntries.get(0);
@@ -189,7 +188,7 @@ public class SQLMetadataRuleManagerTest
mapper.readValue(entry.getPayload().serialized(), new TypeReference>() {})
);
Assert.assertEquals(auditInfo, entry.getAuditInfo());
- Assert.assertEquals(DATASOURCE, entry.getKey());
+ Assert.assertEquals(TestDataSource.WIKI, entry.getKey());
}
@Test
@@ -205,12 +204,12 @@ public class SQLMetadataRuleManagerTest
)
);
final AuditInfo auditInfo = createAuditInfo("test_comment");
- ruleManager.overrideRule(DATASOURCE, rules, auditInfo);
+ ruleManager.overrideRule(TestDataSource.WIKI, rules, auditInfo);
ruleManager.overrideRule("test_dataSource2", rules, auditInfo);
// fetch rules from metadata storage
ruleManager.poll();
- Assert.assertEquals(rules, ruleManager.getRules(DATASOURCE));
+ Assert.assertEquals(rules, ruleManager.getRules(TestDataSource.WIKI));
Assert.assertEquals(rules, ruleManager.getRules("test_dataSource2"));
// test fetch audit entries
@@ -235,13 +234,13 @@ public class SQLMetadataRuleManagerTest
null
)
);
- ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("test"));
+ ruleManager.overrideRule(TestDataSource.WIKI, rules, createAuditInfo("test"));
// Verify that the rule was added
ruleManager.poll();
Map> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
- Assert.assertEquals(1, allRules.get(DATASOURCE).size());
+ Assert.assertEquals(1, allRules.get(TestDataSource.WIKI).size());
// Now delete rules
ruleManager.removeRulesForEmptyDatasourcesOlderThan(System.currentTimeMillis());
@@ -262,13 +261,13 @@ public class SQLMetadataRuleManagerTest
null
)
);
- ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("update rules"));
+ ruleManager.overrideRule(TestDataSource.WIKI, rules, createAuditInfo("update rules"));
// Verify that rule was added
ruleManager.poll();
Map