Do not kill segments with referenced load specs from deep storage (#16667)

Do not kill segments with referenced load specs from deep storage
This commit is contained in:
AmatyaAvadhanula 2024-07-15 14:07:53 +05:30 committed by GitHub
parent 656667ee89
commit d6c760f7ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1080 additions and 57 deletions

View File

@ -79,7 +79,7 @@ public class ActionBasedPublishedSegmentRetriever implements PublishedSegmentRet
catch (Exception e) {
log.warn(
e,
"Could not retrieve published segment IDs[%s] using task action[segmentListById]."
"Could not retrieve published segment IDs[%s] using task action[retrieveSegmentsById]."
+ " Overlord maybe on an older version, retrying with action[segmentListUsed]."
+ " This task may fail to publish segments if there is a concurrent replace happening.",
serializedSegmentIds

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import java.util.Set;
/**
* Task action to retrieve the segment IDs from which a given set of segments were upgraded.
*/
public class RetrieveUpgradedFromSegmentIdsAction implements TaskAction<UpgradedFromSegmentsResponse>
{
private final String dataSource;
private final Set<String> segmentIds;
@JsonCreator
public RetrieveUpgradedFromSegmentIdsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentIds") Set<String> segmentIds
)
{
this.dataSource = dataSource;
this.segmentIds = segmentIds;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Set<String> getSegmentIds()
{
return segmentIds;
}
@Override
public TypeReference<UpgradedFromSegmentsResponse> getReturnTypeReference()
{
return new TypeReference<UpgradedFromSegmentsResponse>()
{
};
}
@Override
public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox toolbox)
{
return new UpgradedFromSegmentsResponse(
toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUpgradedFromSegmentIds(dataSource, segmentIds)
);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", segmentIds=" + segmentIds +
'}';
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import java.util.Set;
/**
* Task action to determine the set of all segments containing the same load spec given the parent id. <br/>
* Returns a map from a segment ID to a set containing:
* <ol>
* <li> all segment IDs that were upgraded from it AND are still present in the metadata store </li>
* <li> the segment ID itself if and only if it is still present in the metadata store </li>
* </ol>
*/
public class RetrieveUpgradedToSegmentIdsAction implements TaskAction<UpgradedToSegmentsResponse>
{
private final String dataSource;
private final Set<String> segmentIds;
@JsonCreator
public RetrieveUpgradedToSegmentIdsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentIds") Set<String> segmentIds
)
{
this.dataSource = dataSource;
this.segmentIds = segmentIds;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Set<String> getSegmentIds()
{
return segmentIds;
}
@Override
public TypeReference<UpgradedToSegmentsResponse> getReturnTypeReference()
{
return new TypeReference<UpgradedToSegmentsResponse>()
{
};
}
@Override
public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox toolbox)
{
return new UpgradedToSegmentsResponse(
toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUpgradedToSegmentIds(dataSource, segmentIds)
);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", segmentIds=" + segmentIds +
'}';
}
}

View File

@ -39,6 +39,8 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "retrieveSegmentsById", value = RetrieveSegmentsByIdAction.class),
@JsonSubTypes.Type(name = "retrieveUpgradedFromSegmentIds", value = RetrieveUpgradedFromSegmentIdsAction.class),
@JsonSubTypes.Type(name = "retrieveUpgradedToSegmentIds", value = RetrieveUpgradedToSegmentIdsAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
public class UpgradedFromSegmentsResponse
{
private final Map<String, String> upgradedFromSegmentIds;
@JsonCreator
public UpgradedFromSegmentsResponse(
@JsonProperty("upgradedFromSegmentIds") Map<String, String> upgradedFromSegmentIds
)
{
this.upgradedFromSegmentIds = upgradedFromSegmentIds;
}
@JsonProperty
public Map<String, String> getUpgradedFromSegmentIds()
{
return upgradedFromSegmentIds;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.Set;
public class UpgradedToSegmentsResponse
{
private final Map<String, Set<String>> upgradedToSegmentIds;
@JsonCreator
public UpgradedToSegmentsResponse(
@JsonProperty("upgradedToSegmentIds") Map<String, Set<String>> upgradedToSegmentIds
)
{
this.upgradedToSegmentIds = upgradedToSegmentIds;
}
@JsonProperty
public Map<String, Set<String>> getUpgradedToSegmentIds()
{
return upgradedToSegmentIds;
}
}

View File

@ -35,11 +35,14 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUpgradedFromSegmentIdsAction;
import org.apache.druid.indexing.common.actions.RetrieveUpgradedToSegmentIdsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.actions.UpgradedToSegmentsResponse;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -47,6 +50,8 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -54,6 +59,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -63,9 +69,23 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* <p/>
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
* <p/>
* The Kill task fetches the set of used segments for the interval and computes the set of their load specs. <br/>
* Until `limit` segments have been processed in total or all segments for the interval have been nuked:
* <ol>
* <li> Fetch at most `batchSize` unused segments from the metadata store. </li>
* <li> Determine the mapping from these segments to their parents *before* nuking the segments. </li>
* <li> Nuke the batch of unused segments from the metadata store. </li>
* <li> Determine the mapping of the set of parents to all their children. </li>
* <li> Check if unused or parent segments exist. </li>
* <li> Find the unreferenced segments. </li>
* <li> Filter the set of unreferenced segments using load specs from the set of used segments. </li>
* <li> Kill the filtered set of segments from deep storage. </li>
* </ol>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@ -76,7 +96,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
* Default nuke batch size. This is a small enough size that we still get value from batching, while
* yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec,
* with batch size of 100, means a batch should only less than a second for the task lock, and depending
* on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time
* on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time,
* we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding
* the task lockbox every 1-2 seconds.
*/
@ -97,13 +117,15 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
/**
* Maximum number of segments that can be killed.
*/
@Nullable private final Integer limit;
@Nullable
private final Integer limit;
/**
* The maximum used status last updated time. Any segments with
* {@code used_status_last_updated} no later than this time will be included in the kill task.
*/
@Nullable private final DateTime maxUsedStatusLastUpdatedTime;
@Nullable
private final DateTime maxUsedStatusLastUpdatedTime;
@JsonCreator
public KillUnusedSegmentsTask(
@ -196,18 +218,17 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);
final TaskActionClient taskActionClient = toolbox.getTaskActionClient();
RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
ImmutableList.of(getInterval()),
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused segment intervals
final Set<Map<String, Object>> usedSegmentLoadSpecs =
new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet())
);
final Set<Map<String, Object>> usedSegmentLoadSpecs = taskActionClient.submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet());
do {
if (nextBatchSize <= 0) {
@ -231,20 +252,47 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
);
}
// Kill segments
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.
// Kill segments. Order is important here:
// Retrieve the segment upgrade infos for the batch _before_ the segments are nuked
// We then want the nuke action to clean up the metadata records _before_ the segments are removed from storage.
// This helps maintain that we will always have a storage segment if the metadata segment is present.
// Determine the subset of segments to be killed from deep storage based on loadspecs.
// If the segment nuke throws an exception, then the segment cleanup is abandoned.
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
// Determine upgraded segment ids before nuking
final Set<String> segmentIds = unusedSegments.stream()
.map(DataSegment::getId)
.map(SegmentId::toString)
.collect(Collectors.toSet());
final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
try {
upgradedFromSegmentIds.putAll(
taskActionClient.submit(
new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), segmentIds)
).getUpgradedFromSegmentIds()
);
}
catch (Exception e) {
LOG.warn(
e,
"Could not retrieve parent segment ids using task action[retrieveUpgradedFromSegmentIds]."
+ " Overlord may be on an older version."
);
}
// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
.stream()
.filter(unusedSegment -> unusedSegment.getLoadSpec() == null
|| !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
.collect(Collectors.toList());
// Nuke Segments
taskActionClient.submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
// Determine segments to be killed
final List<DataSegment> segmentsToBeKilled
= getKillableSegments(unusedSegments, upgradedFromSegmentIds, usedSegmentLoadSpecs, taskActionClient);
final Set<DataSegment> segmentsNotKilled = new HashSet<>(unusedSegments);
segmentsToBeKilled.forEach(segmentsNotKilled::remove);
LOG.infoSegments(
segmentsNotKilled,
"Skipping segment kill from deep storage as their load specs are referenced by other segments."
);
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
numBatchesProcessed++;
@ -253,7 +301,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId());
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches));
} while (!unusedSegments.isEmpty() && (null == numTotalBatches || numBatchesProcessed < numTotalBatches));
final String taskId = getId();
LOG.info(
@ -300,6 +348,64 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
return taskLockMap;
}
/**
* Determines subset of segments without referenced load specs that can be safely killed by
* looking at the segment upgrades and used segment load specs
* @param unusedSegments input segments
* @param upgradedFromSegmentIds segment to parent mapping
* @param usedSegmentLoadSpecs load specs of used segments
* @param taskActionClient task action client
* @return list of segments to kill from deep storage
*/
private List<DataSegment> getKillableSegments(
List<DataSegment> unusedSegments,
Map<String, String> upgradedFromSegmentIds,
Set<Map<String, Object>> usedSegmentLoadSpecs,
TaskActionClient taskActionClient
)
{
// Determine parentId for each unused segment
final Map<String, Set<DataSegment>> parentIdToUnusedSegments = new HashMap<>();
for (DataSegment segment : unusedSegments) {
final String segmentId = segment.getId().toString();
parentIdToUnusedSegments.computeIfAbsent(
upgradedFromSegmentIds.getOrDefault(segmentId, segmentId),
k -> new HashSet<>()
).add(segment);
}
// Check if the parent or any of its children exist in metadata store
try {
UpgradedToSegmentsResponse response = taskActionClient.submit(
new RetrieveUpgradedToSegmentIdsAction(getDataSource(), parentIdToUnusedSegments.keySet())
);
if (response != null && response.getUpgradedToSegmentIds() != null) {
response.getUpgradedToSegmentIds().forEach((parent, children) -> {
if (!CollectionUtils.isNullOrEmpty(children)) {
// Do not kill segment if its parent or any of its siblings still exist in metadata store
parentIdToUnusedSegments.remove(parent);
}
});
}
}
catch (Exception e) {
LOG.warn(
e,
"Could not retrieve referenced ids using task action[retrieveUpgradedToSegmentIds]."
+ " Overlord may be on an older version."
);
}
// Filter using the used segment load specs as segment upgrades predate the above task action
return parentIdToUnusedSegments.values()
.stream()
.flatMap(Set::stream)
.filter(segment -> !usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
.collect(Collectors.toList());
}
@Override
public LookupLoadingSpec getLookupLoadingSpec()
{

View File

@ -61,6 +61,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
@ -81,7 +82,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
@ -130,6 +130,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private SegmentSchemaManager segmentSchemaManager;
private SegmentSchemaCache segmentSchemaCache;
private SupervisorManager supervisorManager;
private TestDataSegmentKiller dataSegmentKiller;
protected File reportsFile;
@Before
@ -169,6 +170,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper());
reportsFile = temporaryFolder.newFile();
dataSegmentKiller = new TestDataSegmentKiller();
}
@After
@ -243,6 +245,11 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
return testUtils.getRowIngestionMetersFactory();
}
public TestDataSegmentKiller getDataSegmentKiller()
{
return dataSegmentKiller;
}
public TaskActionToolbox createTaskActionToolbox()
{
storageCoordinator.start();
@ -265,7 +272,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(createActionClient(task))
.segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()))
.dataSegmentKiller(new NoopDataSegmentKiller())
.dataSegmentKiller(dataSegmentKiller)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(baseDir)
@ -450,7 +457,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(taskActionClient)
.segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()))
.dataSegmentKiller(new NoopDataSegmentKiller())
.dataSegmentKiller(dataSegmentKiller)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(baseDir)

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
@ -72,10 +73,10 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
taskRunner = new TestTaskRunner();
final String version = DateTimes.nowUtc().toString();
segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version).withLoadSpec(ImmutableMap.of("k", 1));
segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version).withLoadSpec(ImmutableMap.of("k", 2));
segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version).withLoadSpec(ImmutableMap.of("k", 3));
segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version).withLoadSpec(ImmutableMap.of("k", 4));
}
@Test
@ -125,6 +126,212 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
new KillTaskReport.Stats(1, 2),
getReportedStats()
);
Assert.assertEquals(ImmutableSet.of(segment3), getDataSegmentKiller().getKilledSegments());
}
@Test
public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception
{
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
"nonExistentParent",
segment2.getId().toString(),
"nonExistentParent"
);
insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping);
getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY);
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.ETERNITY)
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
null,
null,
null
);
Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
Assert.assertEquals(
new KillTaskReport.Stats(2, 2),
getReportedStats()
);
Assert.assertEquals(ImmutableSet.of(segment1, segment2), getDataSegmentKiller().getKilledSegments());
}
@Test
public void testKillSegmentsDoNotDeleteReferencedSibling() throws Exception
{
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
"nonExistentParent",
segment2.getId().toString(),
"nonExistentParent"
);
insertUsedSegments(ImmutableSet.of(segment1, segment2), upgradeSegmentMapping);
getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, Intervals.ETERNITY);
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(segment1.getInterval())
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
null,
null,
null
);
Assert.assertEquals(Collections.singletonList(segment2), observedUnusedSegments);
Assert.assertEquals(
new KillTaskReport.Stats(0, 2),
getReportedStats()
);
Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments());
}
@Test
public void testKillSegmentsDoNotDeleteParentWithReferencedChildren() throws Exception
{
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
segment3.getId().toString(),
segment2.getId().toString(),
segment3.getId().toString()
);
insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping);
getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.ETERNITY)
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
null,
null,
null
);
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
Assertions.assertThat(
getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
Segments.ONLY_VISIBLE
)
).containsExactlyInAnyOrder(segment1);
Assert.assertEquals(
new KillTaskReport.Stats(0, 2),
getReportedStats()
);
Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments());
}
@Test
public void testKillSegmentsDoNotDeleteChildrenWithReferencedParent() throws Exception
{
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
segment3.getId().toString(),
segment2.getId().toString(),
segment3.getId().toString()
);
insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping);
getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.ETERNITY)
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
null,
null,
null
);
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
Assertions.assertThat(
getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
Segments.ONLY_VISIBLE
)
).containsExactlyInAnyOrder(segment3);
Assert.assertEquals(
new KillTaskReport.Stats(0, 2),
getReportedStats()
);
Assert.assertEquals(Collections.emptySet(), getDataSegmentKiller().getKilledSegments());
}
@Test
public void testKillSegmentsDeleteChildrenAndParent() throws Exception
{
final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
segment1.getId().toString(),
segment3.getId().toString(),
segment2.getId().toString(),
segment3.getId().toString()
);
insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), upgradeSegmentMapping);
getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.ETERNITY)
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.ETERNITY,
null,
null,
null
);
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
Assert.assertEquals(
new KillTaskReport.Stats(3, 2),
getReportedStats()
);
Assert.assertEquals(ImmutableSet.of(segment1, segment2, segment3), getDataSegmentKiller().getKilledSegments());
}
@Test
@ -1247,4 +1454,16 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
10L
);
}
private void insertUsedSegments(Set<DataSegment> segments, Map<String, String> upgradedFromSegmentIdMap)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
IndexerSqlMetadataStorageCoordinatorTestBase.insertUsedSegments(
segments,
upgradedFromSegmentIdMap,
derbyConnectorRule.getConnector(),
table,
getObjectMapper()
);
}
}

View File

@ -22,12 +22,18 @@ package org.apache.druid.indexing.test;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
import java.util.Set;
public class TestDataSegmentKiller implements DataSegmentKiller
{
private final Set<DataSegment> killedSegments = new HashSet<>();
@Override
public void kill(DataSegment segment)
{
// do nothing
killedSegments.add(segment);
}
@Override
@ -35,4 +41,9 @@ public class TestDataSegmentKiller implements DataSegmentKiller
{
throw new UnsupportedOperationException("not implemented");
}
public Set<DataSegment> getKilledSegments()
{
return killedSegments;
}
}

View File

@ -314,6 +314,24 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
throw new UnsupportedOperationException();
}
@Override
public Map<String, String> retrieveUpgradedFromSegmentIds(
final String dataSource,
final Set<String> segmentIds
)
{
return Collections.emptyMap();
}
@Override
public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
final String dataSource,
final Set<String> segmentIds
)
{
return Collections.emptyMap();
}
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);

View File

@ -473,4 +473,21 @@ public interface IndexerMetadataStorageCoordinator
* @return List of pending segment records
*/
List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval);
/**
* Map from a segment ID to the segment ID from which it was upgraded
* There should be no entry in the map for an original non-upgraded segment
* @param dataSource data source
* @param segmentIds ids of segments
*/
Map<String, String> retrieveUpgradedFromSegmentIds(String dataSource, Set<String> segmentIds);
/**
* Map from a segment ID to a set containing
* 1) all segment IDs that were upgraded from it AND are still present in the metadata store
* 2) the segment ID itself if and only if it is still present in the metadata store
* @param dataSource data source
* @param segmentIds ids of the first segments which had the corresponding load spec
*/
Map<String, Set<String>> retrieveUpgradedToSegmentIds(String dataSource, Set<String> segmentIds);
}

View File

@ -564,6 +564,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask);
Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata = new HashMap<>();
final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
for (DataSegmentPlus dataSegmentPlus : upgradedSegments) {
segmentsToInsert.add(dataSegmentPlus.getDataSegment());
if (dataSegmentPlus.getSchemaFingerprint() != null && dataSegmentPlus.getNumRows() != null) {
@ -572,6 +573,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
new SegmentMetadata(dataSegmentPlus.getNumRows(), dataSegmentPlus.getSchemaFingerprint())
);
}
if (dataSegmentPlus.getUpgradedFromSegmentId() != null) {
upgradedFromSegmentIdMap.put(
dataSegmentPlus.getDataSegment().getId().toString(),
dataSegmentPlus.getUpgradedFromSegmentId()
);
}
}
SegmentPublishResult result = SegmentPublishResult.ok(
insertSegments(
@ -579,7 +586,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
segmentsToInsert,
segmentSchemaMapping,
upgradeSegmentMetadata,
Collections.emptyMap()
Collections.emptyMap(),
upgradedFromSegmentIdMap
),
upgradePendingSegmentsOverlappingWith(segmentsToInsert)
);
@ -1408,6 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
final Map<SegmentId, SegmentId> newVersionSegmentToParent = new HashMap<>();
final Map<String, DataSegment> segmentIdMap = new HashMap<>();
final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment));
segmentIdsForNewVersions.forEach(
pendingSegment -> {
@ -1415,6 +1424,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
final SegmentId newVersionSegmentId = pendingSegment.getId().asSegmentId();
newVersionSegmentToParent.put(newVersionSegmentId, oldSegment.getId());
upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(), oldSegment.getId().toString());
allSegmentsToInsert.add(
new DataSegment(
pendingSegment.getId().asSegmentId(),
@ -1473,7 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
allSegmentsToInsert,
segmentSchemaMapping,
Collections.emptyMap(),
newVersionSegmentToParent
newVersionSegmentToParent,
upgradedFromSegmentIdMap
)
);
},
@ -2092,7 +2103,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated", now);
.bind("used_status_last_updated", now)
.bind("upgraded_from_segment_id", (String) null);
if (schemaPersistEnabled) {
Long numRows = null;
@ -2217,6 +2229,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.shardSpec(shardSpec)
.build();
// When the segment already has an upgraded_from_segment_id, reuse it for its children
final String upgradedFromSegmentId = oldSegmentMetadata.getUpgradedFromSegmentId() == null
? oldSegmentMetadata.getDataSegment().getId().toString()
: oldSegmentMetadata.getUpgradedFromSegmentId();
upgradedSegments.add(
new DataSegmentPlus(
dataSegment,
@ -2224,7 +2241,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
null,
null,
oldSegmentMetadata.getSchemaFingerprint(),
oldSegmentMetadata.getNumRows())
oldSegmentMetadata.getNumRows(),
upgradedFromSegmentId
)
);
}
@ -2266,7 +2285,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Set<DataSegment> segments,
@Nullable SegmentSchemaMapping segmentSchemaMapping,
Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata,
Map<SegmentId, SegmentId> newVersionForAppendToParent
Map<SegmentId, SegmentId> newVersionForAppendToParent,
Map<String, String> upgradedFromSegmentIdMap
) throws IOException
{
boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
@ -2302,7 +2322,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated", now);
.bind("used_status_last_updated", now)
.bind("upgraded_from_segment_id", upgradedFromSegmentIdMap.get(segment.getId().toString()));
if (schemaPersistEnabled) {
SegmentMetadata segmentMetadata =
@ -2449,9 +2470,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
{
String insertStatement =
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,"
+ " partitioned, version, used, payload, used_status_last_updated %3$s) "
+ " partitioned, version, used, payload, used_status_last_updated, upgraded_from_segment_id %3$s) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end,"
+ " :partitioned, :version, :used, :payload, :used_status_last_updated %4$s)";
+ " :partitioned, :version, :used, :payload, :used_status_last_updated, :upgraded_from_segment_id %4$s)";
if (schemaPersistEnabled) {
return StringUtils.format(
@ -2923,6 +2944,87 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
@Override
public Map<String, String> retrieveUpgradedFromSegmentIds(
final String dataSource,
final Set<String> segmentIds
)
{
if (segmentIds.isEmpty()) {
return Collections.emptyMap();
}
final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
final String sql = StringUtils.format(
"SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", segmentIdList)
);
final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource);
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", segmentIdList, query);
return query.map((index, r, ctx) -> {
final String id = r.getString(1);
final String upgradedFromSegmentId = r.getString(2);
if (upgradedFromSegmentId != null) {
upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
}
return null;
}).list();
}
);
return upgradedFromSegmentIds;
}
@Override
public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
final String dataSource,
final Set<String> segmentIds
)
{
if (segmentIds.isEmpty()) {
return Collections.emptyMap();
}
final List<String> upgradedFromSegmentIdList = ImmutableList.copyOf(segmentIds);
final String sql = StringUtils.format(
"SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
"upgraded_from_segment_id",
upgradedFromSegmentIdList
)
);
final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
retrieveSegmentsById(dataSource, segmentIds)
.stream()
.map(DataSegment::getId)
.map(SegmentId::toString)
.forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id));
connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource);
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
"upgraded_from_segment_id",
upgradedFromSegmentIdList,
query
);
return query.map((index, r, ctx) -> {
final String upgradedToId = r.getString(1);
final String id = r.getString(2);
upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
.add(upgradedToId);
return null;
}).list();
}
);
return upgradedToSegmentIds;
}
private static class PendingSegmentsRecord
{
private final String sequenceName;

View File

@ -40,7 +40,10 @@ import java.sql.ResultSet;
* <li> id -> id (Unique identifier for pending segment) <li/>
* <li> sequence_name -> sequenceName (sequence name used for segment allocation) <li/>
* <li> sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation) <li/>
* <li> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded) <li/>
* <li> upgraded_from_segment_id -> upgradedFromSegmentId
* (ID of the segment which was upgraded to create the current segment.
* If the former was itself created as a result of an upgrade, then this ID
* must refer to the original non-upgraded segment in the hierarchy.) <li/>
* <li> task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment) <li/>
* </ul>
*/

View File

@ -587,6 +587,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
Map<String, String> columnNameTypes = new HashMap<>();
columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)");
if (centralizedDatasourceSchemaConfig.isEnabled()) {
columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
columnNameTypes.put("num_rows", "BIGINT");
@ -619,6 +621,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
}
alterTable(tableName, alterCommands);
final Set<String> createdIndexSet = getIndexOnTable(tableName);
createIndex(
tableName,
StringUtils.format("idx_%1$s_datasource_upgraded_from_segment_id", tableName),
ImmutableList.of("dataSource", "upgraded_from_segment_id"),
createdIndexSet
);
}
@Override

View File

@ -286,7 +286,7 @@ public class SqlSegmentsMetadataQuery
if (includeSchemaInfo) {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used, schema_fingerprint, num_rows FROM %s WHERE dataSource = :dataSource %s",
"SELECT payload, used, schema_fingerprint, num_rows, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);
@ -306,7 +306,8 @@ public class SqlSegmentsMetadataQuery
null,
r.getBoolean(2),
schemaFingerprint,
numRows
numRows,
r.getString(5)
);
}
)
@ -314,7 +315,7 @@ public class SqlSegmentsMetadataQuery
} else {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
"SELECT payload, used, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);
@ -331,7 +332,8 @@ public class SqlSegmentsMetadataQuery
null,
r.getBoolean(2),
null,
null
null,
r.getString(3)
)
)
.iterator();
@ -864,6 +866,7 @@ public class SqlSegmentsMetadataQuery
DateTimes.of(r.getString(3)),
null,
null,
null,
null
))
.iterator();
@ -980,7 +983,7 @@ public class SqlSegmentsMetadataQuery
*
* @see #getParameterizedInConditionForColumn(String, List)
*/
private static void bindColumnValuesToQueryWithInCondition(
static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final SQLStatement<?> query

View File

@ -36,6 +36,8 @@ import java.util.Objects;
* <li>{@link DataSegmentPlus#createdDate} - The time when the segment was created.</li>
* <li>{@link DataSegmentPlus#usedStatusLastUpdatedDate} - The time when the segments
* used status was last updated.</li>
* <li>{@link DataSegmentPlus#upgradedFromSegmentId} - The segment id to which the same load spec originally belonged.
* Load specs can be shared as a result of segment version upgrades.</li>
* </ul>
* <p>
* This class closely resembles the row structure of the {@link MetadataStorageTablesConfig#getSegmentsTable()}.
@ -53,6 +55,9 @@ public class DataSegmentPlus
private final String schemaFingerprint;
private final Long numRows;
@Nullable
private final String upgradedFromSegmentId;
@JsonCreator
public DataSegmentPlus(
@JsonProperty("dataSegment") final DataSegment dataSegment,
@ -60,7 +65,8 @@ public class DataSegmentPlus
@JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime usedStatusLastUpdatedDate,
@JsonProperty("used") @Nullable final Boolean used,
@JsonProperty("schemaFingerprint") @Nullable final String schemaFingerprint,
@JsonProperty("numRows") @Nullable final Long numRows
@JsonProperty("numRows") @Nullable final Long numRows,
@JsonProperty("upgradedFromSegmentId") @Nullable final String upgradedFromSegmentId
)
{
this.dataSegment = dataSegment;
@ -69,6 +75,7 @@ public class DataSegmentPlus
this.used = used;
this.schemaFingerprint = schemaFingerprint;
this.numRows = numRows;
this.upgradedFromSegmentId = upgradedFromSegmentId;
}
@Nullable
@ -112,6 +119,13 @@ public class DataSegmentPlus
return numRows;
}
@Nullable
@JsonProperty
public String getUpgradedFromSegmentId()
{
return upgradedFromSegmentId;
}
@Override
public boolean equals(Object o)
{
@ -127,7 +141,8 @@ public class DataSegmentPlus
&& Objects.equals(usedStatusLastUpdatedDate, that.getUsedStatusLastUpdatedDate())
&& Objects.equals(used, that.getUsed())
&& Objects.equals(schemaFingerprint, that.getSchemaFingerprint())
&& Objects.equals(numRows, that.getNumRows());
&& Objects.equals(numRows, that.getNumRows())
&& Objects.equals(upgradedFromSegmentId, that.getUpgradedFromSegmentId());
}
@Override
@ -139,7 +154,8 @@ public class DataSegmentPlus
usedStatusLastUpdatedDate,
used,
schemaFingerprint,
numRows
numRows,
upgradedFromSegmentId
);
}
@ -153,6 +169,7 @@ public class DataSegmentPlus
", used=" + getUsed() +
", schemaFingerprint=" + getSchemaFingerprint() +
", numRows=" + getNumRows() +
", upgradedFromSegmentId=" + getUpgradedFromSegmentId() +
'}';
}
}

View File

@ -138,8 +138,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
final String v1 = "2023-01-01";
final String v2 = "2023-01-02";
final String v3 = "2023-01-03";
final String alreadyUpgradedVersion = "2023-02-01";
final String lockVersion = "2024-01-01";
final String taskAllocatorId = "appendTask";
final String replaceTaskId = "replaceTask1";
final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
replaceTaskId,
@ -148,6 +150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
);
final Set<DataSegment> appendSegments = new HashSet<>();
final List<PendingSegmentRecord> pendingSegmentsForTask = new ArrayList<>();
final Set<DataSegment> expectedSegmentsToUpgrade = new HashSet<>();
for (int i = 0; i < 10; i++) {
final DataSegment segment = createSegment(
@ -157,6 +160,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
// Add the same segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
SegmentIdWithShardSpec.fromDataSegment(segment),
v1,
segment.getId().toString(),
null,
taskAllocatorId
)
);
// Add upgraded pending segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
DS.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(i, 0)
),
alreadyUpgradedVersion,
segment.getId().toString(),
segment.getId().toString(),
taskAllocatorId
)
);
}
for (int i = 0; i < 10; i++) {
@ -167,6 +195,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
// Add the same segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
SegmentIdWithShardSpec.fromDataSegment(segment),
v2,
segment.getId().toString(),
null,
taskAllocatorId
)
);
// Add upgraded pending segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
DS.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(10 + i, 0)
),
alreadyUpgradedVersion,
segment.getId().toString(),
segment.getId().toString(),
taskAllocatorId
)
);
}
for (int i = 0; i < 10; i++) {
@ -176,23 +229,78 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
new LinearShardSpec(i)
);
appendSegments.add(segment);
// Add the same segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
SegmentIdWithShardSpec.fromDataSegment(segment),
v3,
segment.getId().toString(),
null,
taskAllocatorId
)
);
// Add upgraded pending segment
pendingSegmentsForTask.add(
new PendingSegmentRecord(
new SegmentIdWithShardSpec(
DS.WIKI,
Intervals.of("2023-01-01/2023-02-01"),
alreadyUpgradedVersion,
new NumberedShardSpec(20 + i, 0)
),
alreadyUpgradedVersion,
segment.getId().toString(),
segment.getId().toString(),
taskAllocatorId
)
);
}
derbyConnector.retryWithHandle(
handle -> coordinator.insertPendingSegmentsIntoMetastore(handle, pendingSegmentsForTask, DS.WIKI, false)
);
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= expectedSegmentsToUpgrade.stream()
.collect(Collectors.toMap(s -> s, s -> replaceLock));
// Commit the segment and verify the results
SegmentPublishResult commitResult
= coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append", null);
= coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, taskAllocatorId, null);
Assert.assertTrue(commitResult.isSuccess());
Assert.assertEquals(appendSegments, commitResult.getSegments());
// Verify the segments present in the metadata store
Assert.assertEquals(
appendSegments,
ImmutableSet.copyOf(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()))
Set<DataSegment> allCommittedSegments
= new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
Map<String, String> upgradedFromSegmentIdMap = coordinator.retrieveUpgradedFromSegmentIds(
DS.WIKI,
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
// Verify the segments present in the metadata store
Assert.assertTrue(allCommittedSegments.containsAll(appendSegments));
for (DataSegment segment : appendSegments) {
Assert.assertNull(upgradedFromSegmentIdMap.get(segment.getId().toString()));
}
allCommittedSegments.removeAll(appendSegments);
// Verify the commit of upgraded pending segments
Assert.assertEquals(appendSegments.size(), allCommittedSegments.size());
Map<String, DataSegment> segmentMap = new HashMap<>();
for (DataSegment segment : appendSegments) {
segmentMap.put(segment.getId().toString(), segment);
}
for (DataSegment segment : allCommittedSegments) {
for (PendingSegmentRecord pendingSegmentRecord : pendingSegmentsForTask) {
if (pendingSegmentRecord.getId().asSegmentId().toString().equals(segment.getId().toString())) {
DataSegment upgradedFromSegment = segmentMap.get(pendingSegmentRecord.getUpgradedFromSegmentId());
Assert.assertNotNull(upgradedFromSegment);
Assert.assertEquals(segment.getLoadSpec(), upgradedFromSegment.getLoadSpec());
Assert.assertEquals(
pendingSegmentRecord.getUpgradedFromSegmentId(),
upgradedFromSegmentIdMap.get(segment.getId().toString())
);
}
}
}
// Verify entries in the segment task lock table
final Set<String> expectedUpgradeSegmentIds
@ -290,12 +398,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()).size()
);
final Set<DataSegment> usedSegments = new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
final Set<DataSegment> usedSegments
= new HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
final Map<String, String> upgradedFromSegmentIdMap = coordinator.retrieveUpgradedFromSegmentIds(
"foo",
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock));
for (DataSegment appendSegment : segmentsAppendedWithReplaceLock) {
Assert.assertNull(upgradedFromSegmentIdMap.get(appendSegment.getId().toString()));
}
usedSegments.removeAll(segmentsAppendedWithReplaceLock);
Assert.assertTrue(usedSegments.containsAll(replacingSegments));
for (DataSegment replaceSegment : replacingSegments) {
Assert.assertNull(upgradedFromSegmentIdMap.get(replaceSegment.getId().toString()));
}
usedSegments.removeAll(replacingSegments);
Assert.assertEquals(segmentsAppendedWithReplaceLock.size(), usedSegments.size());
@ -303,6 +423,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
boolean hasBeenCarriedForward = false;
for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) {
if (appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec())) {
Assert.assertEquals(
appendedSegment.getId().toString(),
upgradedFromSegmentIdMap.get(segmentReplicaWithNewVersion.getId().toString())
);
hasBeenCarriedForward = true;
break;
}
@ -3300,4 +3424,63 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
unusedSegmentIdsForIntervalAndVersion.get(0)
);
}
@Test
public void testRetrieveUpgradedFromSegmentIds()
{
final String datasource = defaultSegment.getDataSource();
final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString());
insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), upgradedFromSegmentIdMap);
coordinator.markSegmentsAsUnusedWithinInterval(datasource, Intervals.ETERNITY);
upgradedFromSegmentIdMap.clear();
upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString());
insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), upgradedFromSegmentIdMap);
Map<String, String> expected = new HashMap<>();
expected.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString());
expected.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString());
Set<String> segmentIds = new HashSet<>();
segmentIds.add(defaultSegment.getId().toString());
segmentIds.add(defaultSegment2.getId().toString());
segmentIds.add(defaultSegment3.getId().toString());
segmentIds.add(defaultSegment4.getId().toString());
Assert.assertEquals(
expected,
coordinator.retrieveUpgradedFromSegmentIds(datasource, segmentIds)
);
}
@Test
public void testRetrieveUpgradedToSegmentIds()
{
final String datasource = defaultSegment.getDataSource();
final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), defaultSegment.getId().toString());
insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), upgradedFromSegmentIdMap);
coordinator.markSegmentsAsUnusedWithinInterval(datasource, Intervals.ETERNITY);
upgradedFromSegmentIdMap.clear();
upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), defaultSegment.getId().toString());
insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), upgradedFromSegmentIdMap);
Map<String, Set<String>> expected = new HashMap<>();
expected.put(defaultSegment.getId().toString(), new HashSet<>());
expected.get(defaultSegment.getId().toString()).add(defaultSegment.getId().toString());
expected.get(defaultSegment.getId().toString()).add(defaultSegment2.getId().toString());
expected.get(defaultSegment.getId().toString()).add(defaultSegment3.getId().toString());
Set<String> upgradedIds = new HashSet<>();
upgradedIds.add(defaultSegment.getId().toString());
Assert.assertEquals(
expected,
coordinator.retrieveUpgradedToSegmentIds(datasource, upgradedIds)
);
}
private void insertUsedSegments(Set<DataSegment> segments, Map<String, String> upgradedFromSegmentIdMap)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnector, table, mapper);
}
}

View File

@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -322,6 +323,8 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
.version(version)
.shardSpec(shardSpec)
.size(100)
// hash to get a unique load spec as segmentId has not yet been generated
.loadSpec(ImmutableMap.of("hash", Objects.hash(interval, version, shardSpec)))
.build();
}
@ -559,4 +562,50 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
}
);
}
public static void insertUsedSegments(
Set<DataSegment> dataSegments,
Map<String, String> upgradedFromSegmentIdMap,
SQLMetadataConnector connector,
String table,
ObjectMapper jsonMapper
)
{
connector.retryWithHandle(
handle -> {
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version,"
+ " used, payload, used_status_last_updated, upgraded_from_segment_id) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version,"
+ " :used, :payload, :used_status_last_updated, :upgraded_from_segment_id)",
table,
connector.getQuoteString()
)
);
for (DataSegment segment : dataSegments) {
String id = segment.getId().toString();
preparedBatch.add()
.bind("id", id)
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
.bind("upgraded_from_segment_id", upgradedFromSegmentIdMap.get(segment.getId().toString()));
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (!succeeded) {
throw new ISE("Failed to publish segments to DB");
}
return true;
}
);
}
}

View File

@ -100,6 +100,7 @@ public class DataSegmentPlusTest
usedStatusLastUpdatedDate,
null,
null,
null,
null
);
@ -108,7 +109,7 @@ public class DataSegmentPlusTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(6, objectMap.size());
Assert.assertEquals(7, objectMap.size());
final Map<String, Object> segmentObjectMap = MAPPER.readValue(
MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT

View File

@ -77,7 +77,7 @@ public class MetadataResourceTest
.toArray(new DataSegment[0]);
private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
.map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc(), null, null, null))
.map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc(), null, null, null, null))
.collect(Collectors.toList());
private HttpServletRequest request;
private SegmentsMetadataManager segmentsMetadataManager;