From d6c760f7ce74b5864cd7d0750736f1609d171fdf Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Mon, 15 Jul 2024 14:07:53 +0530 Subject: [PATCH] Do not kill segments with referenced load specs from deep storage (#16667) Do not kill segments with referenced load specs from deep storage --- .../ActionBasedPublishedSegmentRetriever.java | 2 +- .../RetrieveUpgradedFromSegmentIdsAction.java | 90 +++++++ .../RetrieveUpgradedToSegmentIdsAction.java | 95 ++++++++ .../indexing/common/actions/TaskAction.java | 2 + .../actions/UpgradedFromSegmentsResponse.java | 44 ++++ .../actions/UpgradedToSegmentsResponse.java | 46 ++++ .../common/task/KillUnusedSegmentsTask.java | 150 ++++++++++-- .../common/task/IngestionTestBase.java | 13 +- .../task/KillUnusedSegmentsTaskTest.java | 227 +++++++++++++++++- .../indexing/test/TestDataSegmentKiller.java | 13 +- ...TestIndexerMetadataStorageCoordinator.java | 18 ++ .../IndexerMetadataStorageCoordinator.java | 17 ++ .../IndexerSQLMetadataStorageCoordinator.java | 118 ++++++++- .../druid/metadata/PendingSegmentRecord.java | 5 +- .../druid/metadata/SQLMetadataConnector.java | 10 + .../metadata/SqlSegmentsMetadataQuery.java | 13 +- .../druid/server/http/DataSegmentPlus.java | 23 +- ...exerSQLMetadataStorageCoordinatorTest.java | 197 ++++++++++++++- ...SqlMetadataStorageCoordinatorTestBase.java | 49 ++++ .../server/http/DataSegmentPlusTest.java | 3 +- .../server/http/MetadataResourceTest.java | 2 +- 21 files changed, 1080 insertions(+), 57 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java index ba5cf923b12..bb349cc9790 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java @@ -79,7 +79,7 @@ public class ActionBasedPublishedSegmentRetriever implements PublishedSegmentRet catch (Exception e) { log.warn( e, - "Could not retrieve published segment IDs[%s] using task action[segmentListById]." + "Could not retrieve published segment IDs[%s] using task action[retrieveSegmentsById]." + " Overlord maybe on an older version, retrying with action[segmentListUsed]." + " This task may fail to publish segments if there is a concurrent replace happening.", serializedSegmentIds diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java new file mode 100644 index 00000000000..67f7ae6e131 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.indexing.common.task.Task; + +import java.util.Set; + +/** + * Task action to retrieve the segment IDs from which a given set of segments were upgraded. + */ +public class RetrieveUpgradedFromSegmentIdsAction implements TaskAction +{ + private final String dataSource; + private final Set segmentIds; + + @JsonCreator + public RetrieveUpgradedFromSegmentIdsAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segmentIds") Set segmentIds + ) + { + this.dataSource = dataSource; + this.segmentIds = segmentIds; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Set getSegmentIds() + { + return segmentIds; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + @Override + public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox toolbox) + { + return new UpgradedFromSegmentsResponse( + toolbox.getIndexerMetadataStorageCoordinator() + .retrieveUpgradedFromSegmentIds(dataSource, segmentIds) + ); + } + + @Override + public boolean isAudited() + { + return false; + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "dataSource='" + dataSource + '\'' + + ", segmentIds=" + segmentIds + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java new file mode 100644 index 00000000000..412c9604d11 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.indexing.common.task.Task; + +import java.util.Set; + +/** + * Task action to determine the set of all segments containing the same load spec given the parent id.
+ * Returns a map from a segment ID to a set containing: + *
    + *
  1. all segment IDs that were upgraded from it AND are still present in the metadata store
  2. + *
  3. the segment ID itself if and only if it is still present in the metadata store
  4. + *
+ */ +public class RetrieveUpgradedToSegmentIdsAction implements TaskAction +{ + private final String dataSource; + private final Set segmentIds; + + @JsonCreator + public RetrieveUpgradedToSegmentIdsAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segmentIds") Set segmentIds + ) + { + this.dataSource = dataSource; + this.segmentIds = segmentIds; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Set getSegmentIds() + { + return segmentIds; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + @Override + public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox toolbox) + { + return new UpgradedToSegmentsResponse( + toolbox.getIndexerMetadataStorageCoordinator() + .retrieveUpgradedToSegmentIds(dataSource, segmentIds) + ); + } + + @Override + public boolean isAudited() + { + return false; + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "dataSource='" + dataSource + '\'' + + ", segmentIds=" + segmentIds + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 4606bd597a8..973a83ecee4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -39,6 +39,8 @@ import java.util.concurrent.Future; @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), @JsonSubTypes.Type(name = "retrieveSegmentsById", value = RetrieveSegmentsByIdAction.class), + @JsonSubTypes.Type(name = "retrieveUpgradedFromSegmentIds", value = RetrieveUpgradedFromSegmentIdsAction.class), + @JsonSubTypes.Type(name = "retrieveUpgradedToSegmentIds", value = RetrieveUpgradedToSegmentIdsAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class), @JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java new file mode 100644 index 00000000000..5f0f1775f16 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +public class UpgradedFromSegmentsResponse +{ + private final Map upgradedFromSegmentIds; + + @JsonCreator + public UpgradedFromSegmentsResponse( + @JsonProperty("upgradedFromSegmentIds") Map upgradedFromSegmentIds + ) + { + this.upgradedFromSegmentIds = upgradedFromSegmentIds; + } + + @JsonProperty + public Map getUpgradedFromSegmentIds() + { + return upgradedFromSegmentIds; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java new file mode 100644 index 00000000000..e9bf33a97ce --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Set; + +public class UpgradedToSegmentsResponse +{ + + private final Map> upgradedToSegmentIds; + + @JsonCreator + public UpgradedToSegmentsResponse( + @JsonProperty("upgradedToSegmentIds") Map> upgradedToSegmentIds + ) + { + this.upgradedToSegmentIds = upgradedToSegmentIds; + } + + @JsonProperty + public Map> getUpgradedToSegmentIds() + { + return upgradedToSegmentIds; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index fe49569a3bb..e1f6d2915ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -35,11 +35,14 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveUpgradedFromSegmentIdsAction; +import org.apache.druid.indexing.common.actions.RetrieveUpgradedToSegmentIdsAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentNukeAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.actions.UpgradedToSegmentsResponse; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -47,6 +50,8 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -54,6 +59,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -63,9 +69,23 @@ import java.util.TreeMap; import java.util.stream.Collectors; /** + *

* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}. * JSON serialization fields of this class must correspond to those of {@link * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields. + *

+ * The Kill task fetches the set of used segments for the interval and computes the set of their load specs.
+ * Until `limit` segments have been processed in total or all segments for the interval have been nuked: + *

    + *
  1. Fetch at most `batchSize` unused segments from the metadata store.
  2. + *
  3. Determine the mapping from these segments to their parents *before* nuking the segments.
  4. + *
  5. Nuke the batch of unused segments from the metadata store.
  6. + *
  7. Determine the mapping of the set of parents to all their children.
  8. + *
  9. Check if unused or parent segments exist.
  10. + *
  11. Find the unreferenced segments.
  12. + *
  13. Filter the set of unreferenced segments using load specs from the set of used segments.
  14. + *
  15. Kill the filtered set of segments from deep storage.
  16. + *
*/ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { @@ -76,7 +96,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask * Default nuke batch size. This is a small enough size that we still get value from batching, while * yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec, * with batch size of 100, means a batch should only less than a second for the task lock, and depending - * on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time + * on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time, * we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding * the task lockbox every 1-2 seconds. */ @@ -97,13 +117,15 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask /** * Maximum number of segments that can be killed. */ - @Nullable private final Integer limit; + @Nullable + private final Integer limit; /** * The maximum used status last updated time. Any segments with * {@code used_status_last_updated} no later than this time will be included in the kill task. */ - @Nullable private final DateTime maxUsedStatusLastUpdatedTime; + @Nullable + private final DateTime maxUsedStatusLastUpdatedTime; @JsonCreator public KillUnusedSegmentsTask( @@ -196,18 +218,17 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "." ); + final TaskActionClient taskActionClient = toolbox.getTaskActionClient(); RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction( getDataSource(), ImmutableList.of(getInterval()), Segments.INCLUDING_OVERSHADOWED ); // Fetch the load specs of all segments overlapping with the unused segment intervals - final Set> usedSegmentLoadSpecs = - new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction) - .stream() - .map(DataSegment::getLoadSpec) - .collect(Collectors.toSet()) - ); + final Set> usedSegmentLoadSpecs = taskActionClient.submit(retrieveUsedSegmentsAction) + .stream() + .map(DataSegment::getLoadSpec) + .collect(Collectors.toSet()); do { if (nextBatchSize <= 0) { @@ -231,20 +252,47 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask ); } - // Kill segments - // Order is important here: we want the nuke action to clean up the metadata records _before_ the - // segments are removed from storage, this helps maintain that we will always have a storage segment if - // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is - // abandoned. + // Kill segments. Order is important here: + // Retrieve the segment upgrade infos for the batch _before_ the segments are nuked + // We then want the nuke action to clean up the metadata records _before_ the segments are removed from storage. + // This helps maintain that we will always have a storage segment if the metadata segment is present. + // Determine the subset of segments to be killed from deep storage based on loadspecs. + // If the segment nuke throws an exception, then the segment cleanup is abandoned. - toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + // Determine upgraded segment ids before nuking + final Set segmentIds = unusedSegments.stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + final Map upgradedFromSegmentIds = new HashMap<>(); + try { + upgradedFromSegmentIds.putAll( + taskActionClient.submit( + new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), segmentIds) + ).getUpgradedFromSegmentIds() + ); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve parent segment ids using task action[retrieveUpgradedFromSegmentIds]." + + " Overlord may be on an older version." + ); + } - // Kill segments from the deep storage only if their load specs are not being used by any used segments - final List segmentsToBeKilled = unusedSegments - .stream() - .filter(unusedSegment -> unusedSegment.getLoadSpec() == null - || !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec())) - .collect(Collectors.toList()); + // Nuke Segments + taskActionClient.submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + + // Determine segments to be killed + final List segmentsToBeKilled + = getKillableSegments(unusedSegments, upgradedFromSegmentIds, usedSegmentLoadSpecs, taskActionClient); + + final Set segmentsNotKilled = new HashSet<>(unusedSegments); + segmentsToBeKilled.forEach(segmentsNotKilled::remove); + LOG.infoSegments( + segmentsNotKilled, + "Skipping segment kill from deep storage as their load specs are referenced by other segments." + ); toolbox.getDataSegmentKiller().kill(segmentsToBeKilled); numBatchesProcessed++; @@ -253,7 +301,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); nextBatchSize = computeNextBatchSize(numSegmentsKilled); - } while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); + } while (!unusedSegments.isEmpty() && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); final String taskId = getId(); LOG.info( @@ -300,6 +348,64 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask return taskLockMap; } + /** + * Determines subset of segments without referenced load specs that can be safely killed by + * looking at the segment upgrades and used segment load specs + * @param unusedSegments input segments + * @param upgradedFromSegmentIds segment to parent mapping + * @param usedSegmentLoadSpecs load specs of used segments + * @param taskActionClient task action client + * @return list of segments to kill from deep storage + */ + private List getKillableSegments( + List unusedSegments, + Map upgradedFromSegmentIds, + Set> usedSegmentLoadSpecs, + TaskActionClient taskActionClient + ) + { + + // Determine parentId for each unused segment + final Map> parentIdToUnusedSegments = new HashMap<>(); + for (DataSegment segment : unusedSegments) { + final String segmentId = segment.getId().toString(); + parentIdToUnusedSegments.computeIfAbsent( + upgradedFromSegmentIds.getOrDefault(segmentId, segmentId), + k -> new HashSet<>() + ).add(segment); + } + + // Check if the parent or any of its children exist in metadata store + try { + UpgradedToSegmentsResponse response = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), parentIdToUnusedSegments.keySet()) + ); + if (response != null && response.getUpgradedToSegmentIds() != null) { + response.getUpgradedToSegmentIds().forEach((parent, children) -> { + if (!CollectionUtils.isNullOrEmpty(children)) { + // Do not kill segment if its parent or any of its siblings still exist in metadata store + parentIdToUnusedSegments.remove(parent); + } + }); + } + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve referenced ids using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + } + + // Filter using the used segment load specs as segment upgrades predate the above task action + return parentIdToUnusedSegments.values() + .stream() + .flatMap(Set::stream) + .filter(segment -> !usedSegmentLoadSpecs.contains(segment.getLoadSpec())) + .collect(Collectors.toList()); + } + + @Override public LookupLoadingSpec getLookupLoadingSpec() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 133ced3907d..d6687efbf34 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; @@ -81,7 +82,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; -import org.apache.druid.segment.loading.NoopDataSegmentKiller; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.SegmentSchemaCache; @@ -130,6 +130,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private SegmentSchemaManager segmentSchemaManager; private SegmentSchemaCache segmentSchemaCache; private SupervisorManager supervisorManager; + private TestDataSegmentKiller dataSegmentKiller; protected File reportsFile; @Before @@ -169,6 +170,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest lockbox = new TaskLockbox(taskStorage, storageCoordinator); segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper()); reportsFile = temporaryFolder.newFile(); + dataSegmentKiller = new TestDataSegmentKiller(); } @After @@ -243,6 +245,11 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest return testUtils.getRowIngestionMetersFactory(); } + public TestDataSegmentKiller getDataSegmentKiller() + { + return dataSegmentKiller; + } + public TaskActionToolbox createTaskActionToolbox() { storageCoordinator.start(); @@ -265,7 +272,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false)) .taskActionClient(createActionClient(task)) .segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())) - .dataSegmentKiller(new NoopDataSegmentKiller()) + .dataSegmentKiller(dataSegmentKiller) .joinableFactory(NoopJoinableFactory.INSTANCE) .jsonMapper(objectMapper) .taskWorkDir(baseDir) @@ -450,7 +457,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false)) .taskActionClient(taskActionClient) .segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())) - .dataSegmentKiller(new NoopDataSegmentKiller()) + .dataSegmentKiller(dataSegmentKiller) .joinableFactory(NoopJoinableFactory.INSTANCE) .jsonMapper(objectMapper) .taskWorkDir(baseDir) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index f888ace5a54..fe2b5a51c86 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.timeline.DataSegment; import org.assertj.core.api.Assertions; @@ -72,10 +73,10 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase taskRunner = new TestTaskRunner(); final String version = DateTimes.nowUtc().toString(); - segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); - segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); - segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); - segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); + segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version).withLoadSpec(ImmutableMap.of("k", 1)); + segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version).withLoadSpec(ImmutableMap.of("k", 2)); + segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version).withLoadSpec(ImmutableMap.of("k", 3)); + segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version).withLoadSpec(ImmutableMap.of("k", 4)); } @Test @@ -125,6 +126,212 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase new KillTaskReport.Stats(1, 2), getReportedStats() ); + Assert.assertEquals(ImmutableSet.of(segment3), getDataSegmentKiller().getKilledSegments()); + } + + @Test + public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception + { + final Map upgradeSegmentMapping = ImmutableMap.of( + segment1.getId().toString(), + "nonExistentParent", + segment2.getId().toString(), + "nonExistentParent" + ); + insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping); + getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY); + + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.ETERNITY) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + null, + null, + null + ); + + Assert.assertEquals(Collections.emptyList(), observedUnusedSegments); + + Assert.assertEquals( + new KillTaskReport.Stats(2, 2), + getReportedStats() + ); + Assert.assertEquals(ImmutableSet.of(segment1, segment2), getDataSegmentKiller().getKilledSegments()); + } + + @Test + public void testKillSegmentsDoNotDeleteReferencedSibling() throws Exception + { + final Map upgradeSegmentMapping = ImmutableMap.of( + segment1.getId().toString(), + "nonExistentParent", + segment2.getId().toString(), + "nonExistentParent" + ); + insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping); + getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY); + + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(segment1.getInterval()) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + null, + null, + null + ); + + Assert.assertEquals(Collections.singletonList(segment2), observedUnusedSegments); + + Assert.assertEquals( + new KillTaskReport.Stats(0, 2), + getReportedStats() + ); + Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments()); + } + + @Test + public void testKillSegmentsDoNotDeleteParentWithReferencedChildren() throws Exception + { + final Map upgradeSegmentMapping = ImmutableMap.of( + segment1.getId().toString(), + segment3.getId().toString(), + segment2.getId().toString(), + segment3.getId().toString() + ); + insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping); + getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId()); + getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId()); + + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.ETERNITY) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + null, + null, + null + ); + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments); + Assertions.assertThat( + getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + Segments.ONLY_VISIBLE + ) + ).containsExactlyInAnyOrder(segment1); + + Assert.assertEquals( + new KillTaskReport.Stats(0, 2), + getReportedStats() + ); + Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments()); + } + + @Test + public void testKillSegmentsDoNotDeleteChildrenWithReferencedParent() throws Exception + { + final Map upgradeSegmentMapping = ImmutableMap.of( + segment1.getId().toString(), + segment3.getId().toString(), + segment2.getId().toString(), + segment3.getId().toString() + ); + insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping); + getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId()); + getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId()); + + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.ETERNITY) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + null, + null, + null + ); + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments); + Assertions.assertThat( + getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + Segments.ONLY_VISIBLE + ) + ).containsExactlyInAnyOrder(segment3); + + Assert.assertEquals( + new KillTaskReport.Stats(0, 2), + getReportedStats() + ); + Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments()); + } + + @Test + public void testKillSegmentsDeleteChildrenAndParent() throws Exception + { + final Map upgradeSegmentMapping = ImmutableMap.of( + segment1.getId().toString(), + segment3.getId().toString(), + segment2.getId().toString(), + segment3.getId().toString() + ); + insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping); + getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId()); + getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId()); + getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId()); + + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.ETERNITY) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.ETERNITY, + null, + null, + null + ); + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments); + + Assert.assertEquals( + new KillTaskReport.Stats(3, 2), + getReportedStats() + ); + Assert.assertEquals(ImmutableSet.of(segment1, segment2, segment3), getDataSegmentKiller().getKilledSegments()); } @Test @@ -1247,4 +1454,16 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 10L ); } + + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + IndexerSqlMetadataStorageCoordinatorTestBase.insertUsedSegments( + segments, + upgradedFromSegmentIdMap, + derbyConnectorRule.getConnector(), + table, + getObjectMapper() + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java index 33421eb1a5c..92581f6dd1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java @@ -22,12 +22,18 @@ package org.apache.druid.indexing.test; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.timeline.DataSegment; +import java.util.HashSet; +import java.util.Set; + public class TestDataSegmentKiller implements DataSegmentKiller { + + private final Set killedSegments = new HashSet<>(); + @Override public void kill(DataSegment segment) { - // do nothing + killedSegments.add(segment); } @Override @@ -35,4 +41,9 @@ public class TestDataSegmentKiller implements DataSegmentKiller { throw new UnsupportedOperationException("not implemented"); } + + public Set getKilledSegments() + { + return killedSegments; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 61a57e94842..d2055d6e0c9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -314,6 +314,24 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto throw new UnsupportedOperationException(); } + @Override + public Map retrieveUpgradedFromSegmentIds( + final String dataSource, + final Set segmentIds + ) + { + return Collections.emptyMap(); + } + + @Override + public Map> retrieveUpgradedToSegmentIds( + final String dataSource, + final Set segmentIds + ) + { + return Collections.emptyMap(); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index c055a8d9e9f..83b4ac7e474 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -473,4 +473,21 @@ public interface IndexerMetadataStorageCoordinator * @return List of pending segment records */ List getPendingSegments(String datasource, Interval interval); + + /** + * Map from a segment ID to the segment ID from which it was upgraded + * There should be no entry in the map for an original non-upgraded segment + * @param dataSource data source + * @param segmentIds ids of segments + */ + Map retrieveUpgradedFromSegmentIds(String dataSource, Set segmentIds); + + /** + * Map from a segment ID to a set containing + * 1) all segment IDs that were upgraded from it AND are still present in the metadata store + * 2) the segment ID itself if and only if it is still present in the metadata store + * @param dataSource data source + * @param segmentIds ids of the first segments which had the corresponding load spec + */ + Map> retrieveUpgradedToSegmentIds(String dataSource, Set segmentIds); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index fd637728908..54f75ccb920 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -564,6 +564,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask); Map upgradeSegmentMetadata = new HashMap<>(); + final Map upgradedFromSegmentIdMap = new HashMap<>(); for (DataSegmentPlus dataSegmentPlus : upgradedSegments) { segmentsToInsert.add(dataSegmentPlus.getDataSegment()); if (dataSegmentPlus.getSchemaFingerprint() != null && dataSegmentPlus.getNumRows() != null) { @@ -572,6 +573,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor new SegmentMetadata(dataSegmentPlus.getNumRows(), dataSegmentPlus.getSchemaFingerprint()) ); } + if (dataSegmentPlus.getUpgradedFromSegmentId() != null) { + upgradedFromSegmentIdMap.put( + dataSegmentPlus.getDataSegment().getId().toString(), + dataSegmentPlus.getUpgradedFromSegmentId() + ); + } } SegmentPublishResult result = SegmentPublishResult.ok( insertSegments( @@ -579,7 +586,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor segmentsToInsert, segmentSchemaMapping, upgradeSegmentMetadata, - Collections.emptyMap() + Collections.emptyMap(), + upgradedFromSegmentIdMap ), upgradePendingSegmentsOverlappingWith(segmentsToInsert) ); @@ -1408,6 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final Set allSegmentsToInsert = new HashSet<>(appendSegments); final Map newVersionSegmentToParent = new HashMap<>(); final Map segmentIdMap = new HashMap<>(); + final Map upgradedFromSegmentIdMap = new HashMap<>(); appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment)); segmentIdsForNewVersions.forEach( pendingSegment -> { @@ -1415,6 +1424,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId()); final SegmentId newVersionSegmentId = pendingSegment.getId().asSegmentId(); newVersionSegmentToParent.put(newVersionSegmentId, oldSegment.getId()); + upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(), oldSegment.getId().toString()); allSegmentsToInsert.add( new DataSegment( pendingSegment.getId().asSegmentId(), @@ -1473,7 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor allSegmentsToInsert, segmentSchemaMapping, Collections.emptyMap(), - newVersionSegmentToParent + newVersionSegmentToParent, + upgradedFromSegmentIdMap ) ); }, @@ -2092,7 +2103,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("version", segment.getVersion()) .bind("used", usedSegments.contains(segment)) .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .bind("used_status_last_updated", now); + .bind("used_status_last_updated", now) + .bind("upgraded_from_segment_id", (String) null); if (schemaPersistEnabled) { Long numRows = null; @@ -2217,6 +2229,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .shardSpec(shardSpec) .build(); + // When the segment already has an upgraded_from_segment_id, reuse it for its children + final String upgradedFromSegmentId = oldSegmentMetadata.getUpgradedFromSegmentId() == null + ? oldSegmentMetadata.getDataSegment().getId().toString() + : oldSegmentMetadata.getUpgradedFromSegmentId(); + upgradedSegments.add( new DataSegmentPlus( dataSegment, @@ -2224,7 +2241,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor null, null, oldSegmentMetadata.getSchemaFingerprint(), - oldSegmentMetadata.getNumRows()) + oldSegmentMetadata.getNumRows(), + upgradedFromSegmentId + ) ); } @@ -2266,7 +2285,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Set segments, @Nullable SegmentSchemaMapping segmentSchemaMapping, Map upgradeSegmentMetadata, - Map newVersionForAppendToParent + Map newVersionForAppendToParent, + Map upgradedFromSegmentIdMap ) throws IOException { boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping); @@ -2302,7 +2322,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("version", segment.getVersion()) .bind("used", true) .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .bind("used_status_last_updated", now); + .bind("used_status_last_updated", now) + .bind("upgraded_from_segment_id", upgradedFromSegmentIdMap.get(segment.getId().toString())); if (schemaPersistEnabled) { SegmentMetadata segmentMetadata = @@ -2449,9 +2470,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor { String insertStatement = "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s," - + " partitioned, version, used, payload, used_status_last_updated %3$s) " + + " partitioned, version, used, payload, used_status_last_updated, upgraded_from_segment_id %3$s) " + "VALUES (:id, :dataSource, :created_date, :start, :end," - + " :partitioned, :version, :used, :payload, :used_status_last_updated %4$s)"; + + " :partitioned, :version, :used, :payload, :used_status_last_updated, :upgraded_from_segment_id %4$s)"; if (schemaPersistEnabled) { return StringUtils.format( @@ -2923,6 +2944,87 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ); } + @Override + public Map retrieveUpgradedFromSegmentIds( + final String dataSource, + final Set segmentIds + ) + { + if (segmentIds.isEmpty()) { + return Collections.emptyMap(); + } + + final List segmentIdList = ImmutableList.copyOf(segmentIds); + final String sql = StringUtils.format( + "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", + dbTables.getSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", segmentIdList) + ); + final Map upgradedFromSegmentIds = new HashMap<>(); + connector.retryWithHandle( + handle -> { + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource); + SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", segmentIdList, query); + return query.map((index, r, ctx) -> { + final String id = r.getString(1); + final String upgradedFromSegmentId = r.getString(2); + if (upgradedFromSegmentId != null) { + upgradedFromSegmentIds.put(id, upgradedFromSegmentId); + } + return null; + }).list(); + } + ); + return upgradedFromSegmentIds; + } + + @Override + public Map> retrieveUpgradedToSegmentIds( + final String dataSource, + final Set segmentIds + ) + { + if (segmentIds.isEmpty()) { + return Collections.emptyMap(); + } + + final List upgradedFromSegmentIdList = ImmutableList.copyOf(segmentIds); + final String sql = StringUtils.format( + "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", + dbTables.getSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn( + "upgraded_from_segment_id", + upgradedFromSegmentIdList + ) + ); + final Map> upgradedToSegmentIds = new HashMap<>(); + retrieveSegmentsById(dataSource, segmentIds) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id)); + connector.retryWithHandle( + handle -> { + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource); + SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition( + "upgraded_from_segment_id", + upgradedFromSegmentIdList, + query + ); + return query.map((index, r, ctx) -> { + final String upgradedToId = r.getString(1); + final String id = r.getString(2); + upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()) + .add(upgradedToId); + return null; + }).list(); + } + ); + return upgradedToSegmentIds; + } + private static class PendingSegmentsRecord { private final String sequenceName; diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index bfbaad18ef1..f117fe7f28b 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -40,7 +40,10 @@ import java.sql.ResultSet; *
  • id -> id (Unique identifier for pending segment)
  • *
  • sequence_name -> sequenceName (sequence name used for segment allocation)
  • *
  • sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation)
  • - *
  • upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded)
  • + *
  • upgraded_from_segment_id -> upgradedFromSegmentId + * (ID of the segment which was upgraded to create the current segment. + * If the former was itself created as a result of an upgrade, then this ID + * must refer to the original non-upgraded segment in the hierarchy.)
  • *
  • task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment)
  • * */ diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2d315d19fc8..dc87b9fc2fd 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -587,6 +587,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector Map columnNameTypes = new HashMap<>(); columnNameTypes.put("used_status_last_updated", "VARCHAR(255)"); + columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)"); + if (centralizedDatasourceSchemaConfig.isEnabled()) { columnNameTypes.put("schema_fingerprint", "VARCHAR(255)"); columnNameTypes.put("num_rows", "BIGINT"); @@ -619,6 +621,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector } alterTable(tableName, alterCommands); + + final Set createdIndexSet = getIndexOnTable(tableName); + createIndex( + tableName, + StringUtils.format("idx_%1$s_datasource_upgraded_from_segment_id", tableName), + ImmutableList.of("dataSource", "upgraded_from_segment_id"), + createdIndexSet + ); } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index f14cc995050..fc1c84a7037 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -286,7 +286,7 @@ public class SqlSegmentsMetadataQuery if (includeSchemaInfo) { final Query> query = handle.createQuery( StringUtils.format( - "SELECT payload, used, schema_fingerprint, num_rows FROM %s WHERE dataSource = :dataSource %s", + "SELECT payload, used, schema_fingerprint, num_rows, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds) ) ); @@ -306,7 +306,8 @@ public class SqlSegmentsMetadataQuery null, r.getBoolean(2), schemaFingerprint, - numRows + numRows, + r.getString(5) ); } ) @@ -314,7 +315,7 @@ public class SqlSegmentsMetadataQuery } else { final Query> query = handle.createQuery( StringUtils.format( - "SELECT payload, used FROM %s WHERE dataSource = :dataSource %s", + "SELECT payload, used, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds) ) ); @@ -331,7 +332,8 @@ public class SqlSegmentsMetadataQuery null, r.getBoolean(2), null, - null + null, + r.getString(3) ) ) .iterator(); @@ -864,6 +866,7 @@ public class SqlSegmentsMetadataQuery DateTimes.of(r.getString(3)), null, null, + null, null )) .iterator(); @@ -980,7 +983,7 @@ public class SqlSegmentsMetadataQuery * * @see #getParameterizedInConditionForColumn(String, List) */ - private static void bindColumnValuesToQueryWithInCondition( + static void bindColumnValuesToQueryWithInCondition( final String columnName, final List values, final SQLStatement query diff --git a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java index 9841e09a1a7..bfda5cbf3ad 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java @@ -36,6 +36,8 @@ import java.util.Objects; *
  • {@link DataSegmentPlus#createdDate} - The time when the segment was created.
  • *
  • {@link DataSegmentPlus#usedStatusLastUpdatedDate} - The time when the segments * used status was last updated.
  • + *
  • {@link DataSegmentPlus#upgradedFromSegmentId} - The segment id to which the same load spec originally belonged. + * Load specs can be shared as a result of segment version upgrades.
  • * *

    * This class closely resembles the row structure of the {@link MetadataStorageTablesConfig#getSegmentsTable()}. @@ -53,6 +55,9 @@ public class DataSegmentPlus private final String schemaFingerprint; private final Long numRows; + @Nullable + private final String upgradedFromSegmentId; + @JsonCreator public DataSegmentPlus( @JsonProperty("dataSegment") final DataSegment dataSegment, @@ -60,7 +65,8 @@ public class DataSegmentPlus @JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime usedStatusLastUpdatedDate, @JsonProperty("used") @Nullable final Boolean used, @JsonProperty("schemaFingerprint") @Nullable final String schemaFingerprint, - @JsonProperty("numRows") @Nullable final Long numRows + @JsonProperty("numRows") @Nullable final Long numRows, + @JsonProperty("upgradedFromSegmentId") @Nullable final String upgradedFromSegmentId ) { this.dataSegment = dataSegment; @@ -69,6 +75,7 @@ public class DataSegmentPlus this.used = used; this.schemaFingerprint = schemaFingerprint; this.numRows = numRows; + this.upgradedFromSegmentId = upgradedFromSegmentId; } @Nullable @@ -112,6 +119,13 @@ public class DataSegmentPlus return numRows; } + @Nullable + @JsonProperty + public String getUpgradedFromSegmentId() + { + return upgradedFromSegmentId; + } + @Override public boolean equals(Object o) { @@ -127,7 +141,8 @@ public class DataSegmentPlus && Objects.equals(usedStatusLastUpdatedDate, that.getUsedStatusLastUpdatedDate()) && Objects.equals(used, that.getUsed()) && Objects.equals(schemaFingerprint, that.getSchemaFingerprint()) - && Objects.equals(numRows, that.getNumRows()); + && Objects.equals(numRows, that.getNumRows()) + && Objects.equals(upgradedFromSegmentId, that.getUpgradedFromSegmentId()); } @Override @@ -139,7 +154,8 @@ public class DataSegmentPlus usedStatusLastUpdatedDate, used, schemaFingerprint, - numRows + numRows, + upgradedFromSegmentId ); } @@ -153,6 +169,7 @@ public class DataSegmentPlus ", used=" + getUsed() + ", schemaFingerprint=" + getSchemaFingerprint() + ", numRows=" + getNumRows() + + ", upgradedFromSegmentId=" + getUpgradedFromSegmentId() + '}'; } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 222c1ece89f..f352d5e2609 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -138,8 +138,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata final String v1 = "2023-01-01"; final String v2 = "2023-01-02"; final String v3 = "2023-01-03"; + final String alreadyUpgradedVersion = "2023-02-01"; final String lockVersion = "2024-01-01"; + final String taskAllocatorId = "appendTask"; final String replaceTaskId = "replaceTask1"; final ReplaceTaskLock replaceLock = new ReplaceTaskLock( replaceTaskId, @@ -148,6 +150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); final Set appendSegments = new HashSet<>(); + final List pendingSegmentsForTask = new ArrayList<>(); final Set expectedSegmentsToUpgrade = new HashSet<>(); for (int i = 0; i < 10; i++) { final DataSegment segment = createSegment( @@ -157,6 +160,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); appendSegments.add(segment); expectedSegmentsToUpgrade.add(segment); + // Add the same segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + SegmentIdWithShardSpec.fromDataSegment(segment), + v1, + segment.getId().toString(), + null, + taskAllocatorId + ) + ); + // Add upgraded pending segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + new SegmentIdWithShardSpec( + DS.WIKI, + Intervals.of("2023-01-01/2023-02-01"), + alreadyUpgradedVersion, + new NumberedShardSpec(i, 0) + ), + alreadyUpgradedVersion, + segment.getId().toString(), + segment.getId().toString(), + taskAllocatorId + ) + ); } for (int i = 0; i < 10; i++) { @@ -167,6 +195,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); appendSegments.add(segment); expectedSegmentsToUpgrade.add(segment); + // Add the same segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + SegmentIdWithShardSpec.fromDataSegment(segment), + v2, + segment.getId().toString(), + null, + taskAllocatorId + ) + ); + // Add upgraded pending segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + new SegmentIdWithShardSpec( + DS.WIKI, + Intervals.of("2023-01-01/2023-02-01"), + alreadyUpgradedVersion, + new NumberedShardSpec(10 + i, 0) + ), + alreadyUpgradedVersion, + segment.getId().toString(), + segment.getId().toString(), + taskAllocatorId + ) + ); } for (int i = 0; i < 10; i++) { @@ -176,23 +229,78 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata new LinearShardSpec(i) ); appendSegments.add(segment); + // Add the same segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + SegmentIdWithShardSpec.fromDataSegment(segment), + v3, + segment.getId().toString(), + null, + taskAllocatorId + ) + ); + // Add upgraded pending segment + pendingSegmentsForTask.add( + new PendingSegmentRecord( + new SegmentIdWithShardSpec( + DS.WIKI, + Intervals.of("2023-01-01/2023-02-01"), + alreadyUpgradedVersion, + new NumberedShardSpec(20 + i, 0) + ), + alreadyUpgradedVersion, + segment.getId().toString(), + segment.getId().toString(), + taskAllocatorId + ) + ); } + derbyConnector.retryWithHandle( + handle -> coordinator.insertPendingSegmentsIntoMetastore(handle, pendingSegmentsForTask, DS.WIKI, false) + ); + final Map segmentToReplaceLock = expectedSegmentsToUpgrade.stream() .collect(Collectors.toMap(s -> s, s -> replaceLock)); // Commit the segment and verify the results SegmentPublishResult commitResult - = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append", null); + = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, taskAllocatorId, null); Assert.assertTrue(commitResult.isSuccess()); - Assert.assertEquals(appendSegments, commitResult.getSegments()); - // Verify the segments present in the metadata store - Assert.assertEquals( - appendSegments, - ImmutableSet.copyOf(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get())) + Set allCommittedSegments + = new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get())); + Map upgradedFromSegmentIdMap = coordinator.retrieveUpgradedFromSegmentIds( + DS.WIKI, + allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet()) ); + // Verify the segments present in the metadata store + Assert.assertTrue(allCommittedSegments.containsAll(appendSegments)); + for (DataSegment segment : appendSegments) { + Assert.assertNull(upgradedFromSegmentIdMap.get(segment.getId().toString())); + } + allCommittedSegments.removeAll(appendSegments); + + // Verify the commit of upgraded pending segments + Assert.assertEquals(appendSegments.size(), allCommittedSegments.size()); + Map segmentMap = new HashMap<>(); + for (DataSegment segment : appendSegments) { + segmentMap.put(segment.getId().toString(), segment); + } + for (DataSegment segment : allCommittedSegments) { + for (PendingSegmentRecord pendingSegmentRecord : pendingSegmentsForTask) { + if (pendingSegmentRecord.getId().asSegmentId().toString().equals(segment.getId().toString())) { + DataSegment upgradedFromSegment = segmentMap.get(pendingSegmentRecord.getUpgradedFromSegmentId()); + Assert.assertNotNull(upgradedFromSegment); + Assert.assertEquals(segment.getLoadSpec(), upgradedFromSegment.getLoadSpec()); + Assert.assertEquals( + pendingSegmentRecord.getUpgradedFromSegmentId(), + upgradedFromSegmentIdMap.get(segment.getId().toString()) + ); + } + } + } // Verify entries in the segment task lock table final Set expectedUpgradeSegmentIds @@ -290,12 +398,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()).size() ); - final Set usedSegments = new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get())); + final Set usedSegments + = new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get())); + + final Map upgradedFromSegmentIdMap = coordinator.retrieveUpgradedFromSegmentIds( + "foo", + usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet()) + ); Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock)); + for (DataSegment appendSegment : segmentsAppendedWithReplaceLock) { + Assert.assertNull(upgradedFromSegmentIdMap.get(appendSegment.getId().toString())); + } usedSegments.removeAll(segmentsAppendedWithReplaceLock); Assert.assertTrue(usedSegments.containsAll(replacingSegments)); + for (DataSegment replaceSegment : replacingSegments) { + Assert.assertNull(upgradedFromSegmentIdMap.get(replaceSegment.getId().toString())); + } usedSegments.removeAll(replacingSegments); Assert.assertEquals(segmentsAppendedWithReplaceLock.size(), usedSegments.size()); @@ -303,6 +423,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata boolean hasBeenCarriedForward = false; for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) { if (appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec())) { + Assert.assertEquals( + appendedSegment.getId().toString(), + upgradedFromSegmentIdMap.get(segmentReplicaWithNewVersion.getId().toString()) + ); hasBeenCarriedForward = true; break; } @@ -3300,4 +3424,63 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata unusedSegmentIdsForIntervalAndVersion.get(0) ); } + + @Test + public void testRetrieveUpgradedFromSegmentIds() + { + final String datasource = defaultSegment.getDataSource(); + final Map upgradedFromSegmentIdMap = new HashMap<>(); + upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString()); + insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), upgradedFromSegmentIdMap); + coordinator.markSegmentsAsUnusedWithinInterval(datasource, Intervals.ETERNITY); + upgradedFromSegmentIdMap.clear(); + upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString()); + insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), upgradedFromSegmentIdMap); + + Map expected = new HashMap<>(); + expected.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString()); + expected.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString()); + + Set segmentIds = new HashSet<>(); + segmentIds.add(defaultSegment.getId().toString()); + segmentIds.add(defaultSegment2.getId().toString()); + segmentIds.add(defaultSegment3.getId().toString()); + segmentIds.add(defaultSegment4.getId().toString()); + Assert.assertEquals( + expected, + coordinator.retrieveUpgradedFromSegmentIds(datasource, segmentIds) + ); + } + + @Test + public void testRetrieveUpgradedToSegmentIds() + { + final String datasource = defaultSegment.getDataSource(); + final Map upgradedFromSegmentIdMap = new HashMap<>(); + upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString()); + insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), upgradedFromSegmentIdMap); + coordinator.markSegmentsAsUnusedWithinInterval(datasource, Intervals.ETERNITY); + upgradedFromSegmentIdMap.clear(); + upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString()); + insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), upgradedFromSegmentIdMap); + + Map> expected = new HashMap<>(); + expected.put(defaultSegment.getId().toString(), new HashSet<>()); + expected.get(defaultSegment.getId().toString()).add(defaultSegment.getId().toString()); + expected.get(defaultSegment.getId().toString()).add(defaultSegment2.getId().toString()); + expected.get(defaultSegment.getId().toString()).add(defaultSegment3.getId().toString()); + + Set upgradedIds = new HashSet<>(); + upgradedIds.add(defaultSegment.getId().toString()); + Assert.assertEquals( + expected, + coordinator.retrieveUpgradedToSegmentIds(datasource, upgradedIds) + ); + } + + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnector, table, mapper); + } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java index ce0e0686058..2076e5ffa46 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -322,6 +323,8 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase .version(version) .shardSpec(shardSpec) .size(100) + // hash to get a unique load spec as segmentId has not yet been generated + .loadSpec(ImmutableMap.of("hash", Objects.hash(interval, version, shardSpec))) .build(); } @@ -559,4 +562,50 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase } ); } + + public static void insertUsedSegments( + Set dataSegments, + Map upgradedFromSegmentIdMap, + SQLMetadataConnector connector, + String table, + ObjectMapper jsonMapper + ) + { + connector.retryWithHandle( + handle -> { + PreparedBatch preparedBatch = handle.prepareBatch( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version," + + " used, payload, used_status_last_updated, upgraded_from_segment_id) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version," + + " :used, :payload, :used_status_last_updated, :upgraded_from_segment_id)", + table, + connector.getQuoteString() + ) + ); + for (DataSegment segment : dataSegments) { + String id = segment.getId().toString(); + preparedBatch.add() + .bind("id", id) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec)) + .bind("version", segment.getVersion()) + .bind("used", true) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .bind("used_status_last_updated", DateTimes.nowUtc().toString()) + .bind("upgraded_from_segment_id", upgradedFromSegmentIdMap.get(segment.getId().toString())); + } + + final int[] affectedRows = preparedBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (!succeeded) { + throw new ISE("Failed to publish segments to DB"); + } + return true; + } + ); + } } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java index 0f20fc96bdc..b963f433708 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java @@ -100,6 +100,7 @@ public class DataSegmentPlusTest usedStatusLastUpdatedDate, null, null, + null, null ); @@ -108,7 +109,7 @@ public class DataSegmentPlusTest JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - Assert.assertEquals(6, objectMap.size()); + Assert.assertEquals(7, objectMap.size()); final Map segmentObjectMap = MAPPER.readValue( MAPPER.writeValueAsString(segmentPlus.getDataSegment()), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index 4d6bbf5929b..9c52d639300 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -77,7 +77,7 @@ public class MetadataResourceTest .toArray(new DataSegment[0]); private final List segmentsPlus = Arrays.stream(segments) - .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc(), null, null, null)) + .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc(), null, null, null, null)) .collect(Collectors.toList()); private HttpServletRequest request; private SegmentsMetadataManager segmentsMetadataManager;