mirror of https://github.com/apache/druid.git
Remove deprecated UnknownSegmentIdsException (#16112)
Changes - Replace usages of `UnknownSegmentIdsException` with `DruidException` - Add method `SqlMetadataQuery.retrieveSegments` - Add new field `used` to `DataSegmentPlus`
This commit is contained in:
parent
fb7bb0953d
commit
82fced571b
|
@ -50,6 +50,7 @@ import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||||
import org.apache.druid.segment.SegmentUtils;
|
import org.apache.druid.segment.SegmentUtils;
|
||||||
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
|
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
|
||||||
|
import org.apache.druid.server.http.DataSegmentPlus;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.Partitions;
|
import org.apache.druid.timeline.Partitions;
|
||||||
import org.apache.druid.timeline.SegmentTimeline;
|
import org.apache.druid.timeline.SegmentTimeline;
|
||||||
|
@ -2035,6 +2036,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String datasource = replaceSegments.iterator().next().getDataSource();
|
||||||
|
|
||||||
// For each replace interval, find the number of core partitions and total partitions
|
// For each replace interval, find the number of core partitions and total partitions
|
||||||
final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
|
final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
|
||||||
final Map<Interval, Integer> intervalToCurrentPartitionNum = new HashMap<>();
|
final Map<Interval, Integer> intervalToCurrentPartitionNum = new HashMap<>();
|
||||||
|
@ -2055,7 +2058,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
final Map<String, String> upgradeSegmentToLockVersion
|
final Map<String, String> upgradeSegmentToLockVersion
|
||||||
= getAppendSegmentsCommittedDuringTask(handle, taskId);
|
= getAppendSegmentsCommittedDuringTask(handle, taskId);
|
||||||
final List<DataSegment> segmentsToUpgrade
|
final List<DataSegment> segmentsToUpgrade
|
||||||
= retrieveSegmentsById(handle, upgradeSegmentToLockVersion.keySet());
|
= retrieveSegmentsById(handle, datasource, upgradeSegmentToLockVersion.keySet());
|
||||||
|
|
||||||
if (segmentsToUpgrade.isEmpty()) {
|
if (segmentsToUpgrade.isEmpty()) {
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
|
@ -2242,30 +2245,17 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<DataSegment> retrieveSegmentsById(Handle handle, Set<String> segmentIds)
|
private List<DataSegment> retrieveSegmentsById(Handle handle, String datasource, Set<String> segmentIds)
|
||||||
{
|
{
|
||||||
if (segmentIds.isEmpty()) {
|
if (segmentIds.isEmpty()) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String segmentIdCsv = segmentIds.stream()
|
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
|
||||||
.map(id -> "'" + id + "'")
|
.retrieveSegmentsById(datasource, segmentIds)
|
||||||
.collect(Collectors.joining(","));
|
.stream()
|
||||||
ResultIterator<DataSegment> resultIterator = handle
|
.map(DataSegmentPlus::getDataSegment)
|
||||||
.createQuery(
|
.collect(Collectors.toList());
|
||||||
StringUtils.format(
|
|
||||||
"SELECT payload FROM %s WHERE id in (%s)",
|
|
||||||
dbTables.getSegmentsTable(), segmentIdCsv
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.setFetchSize(connector.getStreamingFetchSize())
|
|
||||||
.map(
|
|
||||||
(index, r, ctx) ->
|
|
||||||
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)
|
|
||||||
)
|
|
||||||
.iterator();
|
|
||||||
|
|
||||||
return Lists.newArrayList(resultIterator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildSqlToInsertSegments()
|
private String buildSqlToInsertSegments()
|
||||||
|
|
|
@ -55,8 +55,17 @@ public interface SegmentsMetadataManager
|
||||||
|
|
||||||
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval);
|
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval);
|
||||||
|
|
||||||
int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)
|
/**
|
||||||
throws UnknownSegmentIdsException;
|
* Marks the given segment IDs as "used" only if there are not already overshadowed
|
||||||
|
* by other used segments. Qualifying segment IDs that are already marked as
|
||||||
|
* "used" are not updated.
|
||||||
|
*
|
||||||
|
* @return Number of segments updated
|
||||||
|
* @throws org.apache.druid.error.DruidException of category INVALID_INPUT if
|
||||||
|
* any of the given segment IDs
|
||||||
|
* do not exist in the metadata store.
|
||||||
|
*/
|
||||||
|
int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
|
* Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
|
||||||
|
|
|
@ -35,6 +35,8 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.druid.client.DataSourcesSnapshot;
|
import org.apache.druid.client.DataSourcesSnapshot;
|
||||||
import org.apache.druid.client.ImmutableDruidDataSource;
|
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||||
|
import org.apache.druid.error.DruidException;
|
||||||
|
import org.apache.druid.error.InvalidInput;
|
||||||
import org.apache.druid.guice.ManageLifecycle;
|
import org.apache.druid.guice.ManageLifecycle;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
@ -718,7 +720,6 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int markAsUsedNonOvershadowedSegments(final String dataSource, final Set<String> segmentIds)
|
public int markAsUsedNonOvershadowedSegments(final String dataSource, final Set<String> segmentIds)
|
||||||
throws UnknownSegmentIdsException
|
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Pair<List<DataSegment>, SegmentTimeline> unusedSegmentsAndTimeline = connector
|
Pair<List<DataSegment>, SegmentTimeline> unusedSegmentsAndTimeline = connector
|
||||||
|
@ -744,8 +745,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
Throwable rootCause = Throwables.getRootCause(e);
|
Throwable rootCause = Throwables.getRootCause(e);
|
||||||
if (rootCause instanceof UnknownSegmentIdsException) {
|
if (rootCause instanceof DruidException) {
|
||||||
throw (UnknownSegmentIdsException) rootCause;
|
throw (DruidException) rootCause;
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -756,58 +757,30 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
||||||
final String dataSource,
|
final String dataSource,
|
||||||
final Set<String> segmentIds,
|
final Set<String> segmentIds,
|
||||||
final Handle handle
|
final Handle handle
|
||||||
) throws UnknownSegmentIdsException
|
)
|
||||||
{
|
{
|
||||||
List<String> unknownSegmentIds = new ArrayList<>();
|
final List<DataSegmentPlus> retrievedSegments = SqlSegmentsMetadataQuery
|
||||||
List<DataSegment> segments = segmentIds
|
.forHandle(handle, connector, dbTables.get(), jsonMapper)
|
||||||
.stream()
|
.retrieveSegmentsById(dataSource, segmentIds);
|
||||||
.map(
|
|
||||||
segmentId -> {
|
final Set<String> unknownSegmentIds = new HashSet<>(segmentIds);
|
||||||
Iterator<DataSegment> segmentResultIterator = handle
|
final List<DataSegment> unusedSegments = new ArrayList<>();
|
||||||
.createQuery(
|
for (DataSegmentPlus entry : retrievedSegments) {
|
||||||
StringUtils.format(
|
final DataSegment segment = entry.getDataSegment();
|
||||||
"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id",
|
unknownSegmentIds.remove(segment.getId().toString());
|
||||||
getSegmentsTable()
|
if (Boolean.FALSE.equals(entry.getUsed())) {
|
||||||
)
|
unusedSegments.add(segment);
|
||||||
)
|
}
|
||||||
.bind("dataSource", dataSource)
|
|
||||||
.bind("id", segmentId)
|
|
||||||
.map((int index, ResultSet resultSet, StatementContext context) -> {
|
|
||||||
try {
|
|
||||||
if (!resultSet.getBoolean("used")) {
|
|
||||||
return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
|
|
||||||
} else {
|
|
||||||
// We emit nulls for used segments. They are filtered out below in this method.
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.iterator();
|
|
||||||
if (!segmentResultIterator.hasNext()) {
|
|
||||||
unknownSegmentIds.add(segmentId);
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
@Nullable DataSegment segment = segmentResultIterator.next();
|
|
||||||
if (segmentResultIterator.hasNext()) {
|
|
||||||
log.error(
|
|
||||||
"There is more than one row corresponding to segment id [%s] in data source [%s] in the database",
|
|
||||||
segmentId,
|
|
||||||
dataSource
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return segment;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
.filter(Objects::nonNull) // Filter nulls corresponding to used segments.
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
if (!unknownSegmentIds.isEmpty()) {
|
|
||||||
throw new UnknownSegmentIdsException(unknownSegmentIds);
|
|
||||||
}
|
}
|
||||||
return segments;
|
|
||||||
|
if (!unknownSegmentIds.isEmpty()) {
|
||||||
|
throw InvalidInput.exception(
|
||||||
|
"Could not find segment IDs[%s] for datasource[%s]",
|
||||||
|
unknownSegmentIds, dataSource
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return unusedSegments;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CloseableIterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(
|
private CloseableIterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(
|
||||||
|
|
|
@ -44,13 +44,13 @@ import org.skife.jdbi.v2.Query;
|
||||||
import org.skife.jdbi.v2.ResultIterator;
|
import org.skife.jdbi.v2.ResultIterator;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -227,6 +227,49 @@ public class SqlSegmentsMetadataQuery
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<DataSegmentPlus> retrieveSegmentsById(String datasource, Set<String> segmentIds)
|
||||||
|
{
|
||||||
|
final List<List<String>> partitionedSegmentIds
|
||||||
|
= Lists.partition(new ArrayList<>(segmentIds), 100);
|
||||||
|
|
||||||
|
final List<DataSegmentPlus> fetchedSegments = new ArrayList<>(segmentIds.size());
|
||||||
|
for (List<String> partition : partitionedSegmentIds) {
|
||||||
|
fetchedSegments.addAll(retrieveSegmentBatchById(datasource, partition));
|
||||||
|
}
|
||||||
|
return fetchedSegments;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds)
|
||||||
|
{
|
||||||
|
if (segmentIds.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String segmentIdCsv = segmentIds.stream()
|
||||||
|
.map(id -> "'" + id + "'")
|
||||||
|
.collect(Collectors.joining(","));
|
||||||
|
ResultIterator<DataSegmentPlus> resultIterator = handle
|
||||||
|
.createQuery(
|
||||||
|
StringUtils.format(
|
||||||
|
"SELECT payload, used FROM %s WHERE dataSource = :dataSource AND id IN (%s)",
|
||||||
|
dbTables.getSegmentsTable(), segmentIdCsv
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.bind("dataSource", datasource)
|
||||||
|
.setFetchSize(connector.getStreamingFetchSize())
|
||||||
|
.map(
|
||||||
|
(index, r, ctx) -> new DataSegmentPlus(
|
||||||
|
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
r.getBoolean(2)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.iterator();
|
||||||
|
|
||||||
|
return Lists.newArrayList(resultIterator);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks the provided segments as either used or unused.
|
* Marks the provided segments as either used or unused.
|
||||||
*
|
*
|
||||||
|
@ -335,7 +378,6 @@ public class SqlSegmentsMetadataQuery
|
||||||
*/
|
*/
|
||||||
public DataSegment retrieveUsedSegmentForId(String id)
|
public DataSegment retrieveUsedSegmentForId(String id)
|
||||||
{
|
{
|
||||||
|
|
||||||
final String query = "SELECT payload FROM %s WHERE used = true AND id = :id";
|
final String query = "SELECT payload FROM %s WHERE used = true AND id = :id";
|
||||||
|
|
||||||
final Query<Map<String, Object>> sql = handle
|
final Query<Map<String, Object>> sql = handle
|
||||||
|
@ -358,7 +400,6 @@ public class SqlSegmentsMetadataQuery
|
||||||
*/
|
*/
|
||||||
public DataSegment retrieveSegmentForId(String id)
|
public DataSegment retrieveSegmentForId(String id)
|
||||||
{
|
{
|
||||||
|
|
||||||
final String query = "SELECT payload FROM %s WHERE id = :id";
|
final String query = "SELECT payload FROM %s WHERE id = :id";
|
||||||
|
|
||||||
final Query<Map<String, Object>> sql = handle
|
final Query<Map<String, Object>> sql = handle
|
||||||
|
@ -707,7 +748,8 @@ public class SqlSegmentsMetadataQuery
|
||||||
return sql.map((index, r, ctx) -> new DataSegmentPlus(
|
return sql.map((index, r, ctx) -> new DataSegmentPlus(
|
||||||
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
|
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
|
||||||
DateTimes.of(r.getString(2)),
|
DateTimes.of(r.getString(2)),
|
||||||
DateTimes.of(r.getString(3))
|
DateTimes.of(r.getString(3)),
|
||||||
|
null
|
||||||
))
|
))
|
||||||
.iterator();
|
.iterator();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.metadata;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Exception thrown by {@link SegmentsMetadataManager} when a segment id is unknown.
|
|
||||||
*
|
|
||||||
* @deprecated Usages of this exception will be replaced by the new
|
|
||||||
* {@link org.apache.druid.error.DruidException} in a future release.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public class UnknownSegmentIdsException extends Exception
|
|
||||||
{
|
|
||||||
private final Collection<String> unknownSegmentIds;
|
|
||||||
|
|
||||||
UnknownSegmentIdsException(Collection<String> segmentIds)
|
|
||||||
{
|
|
||||||
super("Cannot find segment ids " + segmentIds);
|
|
||||||
this.unknownSegmentIds = segmentIds;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<String> getUnknownSegmentIds()
|
|
||||||
{
|
|
||||||
return unknownSegmentIds;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -31,10 +31,14 @@ import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates a {@link DataSegment} and additional metadata about it:
|
* Encapsulates a {@link DataSegment} and additional metadata about it:
|
||||||
* {@link DataSegmentPlus#createdDate}: The time when the segment was created </li>
|
* <ul>
|
||||||
* {@link DataSegmentPlus#usedStatusLastUpdatedDate}: The time when the segments used status was last updated </li>
|
* <li>{@link DataSegmentPlus#used} - Boolean flag representing if the segment is used.</li>
|
||||||
|
* <li>{@link DataSegmentPlus#createdDate} - The time when the segment was created.</li>
|
||||||
|
* <li>{@link DataSegmentPlus#usedStatusLastUpdatedDate} - The time when the segments
|
||||||
|
* used status was last updated.</li>
|
||||||
|
* </ul>
|
||||||
* <p>
|
* <p>
|
||||||
* The class closesly resembles the row structure of the {@link MetadataStorageTablesConfig#getSegmentsTable()}
|
* This class closely resembles the row structure of the {@link MetadataStorageTablesConfig#getSegmentsTable()}.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
@UnstableApi
|
@UnstableApi
|
||||||
|
@ -44,19 +48,23 @@ public class DataSegmentPlus
|
||||||
private final DateTime createdDate;
|
private final DateTime createdDate;
|
||||||
@Nullable
|
@Nullable
|
||||||
private final DateTime usedStatusLastUpdatedDate;
|
private final DateTime usedStatusLastUpdatedDate;
|
||||||
|
private final Boolean used;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public DataSegmentPlus(
|
public DataSegmentPlus(
|
||||||
@JsonProperty("dataSegment") final DataSegment dataSegment,
|
@JsonProperty("dataSegment") final DataSegment dataSegment,
|
||||||
@JsonProperty("createdDate") final DateTime createdDate,
|
@JsonProperty("createdDate") @Nullable final DateTime createdDate,
|
||||||
@JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime usedStatusLastUpdatedDate
|
@JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime usedStatusLastUpdatedDate,
|
||||||
|
@JsonProperty("used") @Nullable final Boolean used
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataSegment = dataSegment;
|
this.dataSegment = dataSegment;
|
||||||
this.createdDate = createdDate;
|
this.createdDate = createdDate;
|
||||||
this.usedStatusLastUpdatedDate = usedStatusLastUpdatedDate;
|
this.usedStatusLastUpdatedDate = usedStatusLastUpdatedDate;
|
||||||
|
this.used = used;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public DateTime getCreatedDate()
|
public DateTime getCreatedDate()
|
||||||
{
|
{
|
||||||
|
@ -76,6 +84,13 @@ public class DataSegmentPlus
|
||||||
return dataSegment;
|
return dataSegment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@JsonProperty
|
||||||
|
public Boolean getUsed()
|
||||||
|
{
|
||||||
|
return used;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
|
@ -88,7 +103,8 @@ public class DataSegmentPlus
|
||||||
DataSegmentPlus that = (DataSegmentPlus) o;
|
DataSegmentPlus that = (DataSegmentPlus) o;
|
||||||
return Objects.equals(dataSegment, that.getDataSegment())
|
return Objects.equals(dataSegment, that.getDataSegment())
|
||||||
&& Objects.equals(createdDate, that.getCreatedDate())
|
&& Objects.equals(createdDate, that.getCreatedDate())
|
||||||
&& Objects.equals(usedStatusLastUpdatedDate, that.getUsedStatusLastUpdatedDate());
|
&& Objects.equals(usedStatusLastUpdatedDate, that.getUsedStatusLastUpdatedDate())
|
||||||
|
&& Objects.equals(used, that.getUsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -97,7 +113,8 @@ public class DataSegmentPlus
|
||||||
return Objects.hash(
|
return Objects.hash(
|
||||||
dataSegment,
|
dataSegment,
|
||||||
createdDate,
|
createdDate,
|
||||||
usedStatusLastUpdatedDate
|
usedStatusLastUpdatedDate,
|
||||||
|
used
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +125,7 @@ public class DataSegmentPlus
|
||||||
"createdDate=" + getCreatedDate() +
|
"createdDate=" + getCreatedDate() +
|
||||||
", usedStatusLastUpdatedDate=" + getUsedStatusLastUpdatedDate() +
|
", usedStatusLastUpdatedDate=" + getUsedStatusLastUpdatedDate() +
|
||||||
", dataSegment=" + getDataSegment() +
|
", dataSegment=" + getDataSegment() +
|
||||||
|
", used=" + getUsed() +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,9 @@ import org.apache.druid.client.ImmutableDruidDataSource;
|
||||||
import org.apache.druid.client.ImmutableSegmentLoadInfo;
|
import org.apache.druid.client.ImmutableSegmentLoadInfo;
|
||||||
import org.apache.druid.client.SegmentLoadInfo;
|
import org.apache.druid.client.SegmentLoadInfo;
|
||||||
import org.apache.druid.common.guava.FutureUtils;
|
import org.apache.druid.common.guava.FutureUtils;
|
||||||
|
import org.apache.druid.error.DruidException;
|
||||||
|
import org.apache.druid.error.ErrorResponse;
|
||||||
|
import org.apache.druid.error.InvalidInput;
|
||||||
import org.apache.druid.guice.annotations.PublicApi;
|
import org.apache.druid.guice.annotations.PublicApi;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
@ -49,7 +52,6 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.metadata.MetadataRuleManager;
|
import org.apache.druid.metadata.MetadataRuleManager;
|
||||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||||
import org.apache.druid.metadata.UnknownSegmentIdsException;
|
|
||||||
import org.apache.druid.query.SegmentDescriptor;
|
import org.apache.druid.query.SegmentDescriptor;
|
||||||
import org.apache.druid.query.TableDataSource;
|
import org.apache.druid.query.TableDataSource;
|
||||||
import org.apache.druid.rpc.indexing.OverlordClient;
|
import org.apache.druid.rpc.indexing.OverlordClient;
|
||||||
|
@ -184,7 +186,7 @@ public class DataSourcesResource
|
||||||
|
|
||||||
private interface SegmentUpdateOperation
|
private interface SegmentUpdateOperation
|
||||||
{
|
{
|
||||||
int perform() throws UnknownSegmentIdsException;
|
int perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
|
@ -193,8 +195,8 @@ public class DataSourcesResource
|
||||||
@ResourceFilters(DatasourceResourceFilter.class)
|
@ResourceFilters(DatasourceResourceFilter.class)
|
||||||
public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName)
|
public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName)
|
||||||
{
|
{
|
||||||
SegmentUpdateOperation operation = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(
|
SegmentUpdateOperation operation = () -> segmentsMetadataManager
|
||||||
dataSourceName);
|
.markAsUsedAllNonOvershadowedSegmentsInDataSource(dataSourceName);
|
||||||
return performSegmentUpdate(dataSourceName, operation);
|
return performSegmentUpdate(dataSourceName, operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,6 +215,21 @@ public class DataSourcesResource
|
||||||
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
|
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
|
||||||
} else {
|
} else {
|
||||||
final Set<String> segmentIds = payload.getSegmentIds();
|
final Set<String> segmentIds = payload.getSegmentIds();
|
||||||
|
if (segmentIds == null || segmentIds.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate segmentIds
|
||||||
|
final List<String> invalidSegmentIds = new ArrayList<>();
|
||||||
|
for (String segmentId : segmentIds) {
|
||||||
|
if (SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId).isEmpty()) {
|
||||||
|
invalidSegmentIds.add(segmentId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!invalidSegmentIds.isEmpty()) {
|
||||||
|
throw InvalidInput.exception("Could not parse invalid segment IDs[%s]", invalidSegmentIds);
|
||||||
|
}
|
||||||
|
|
||||||
return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
|
return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -298,11 +315,10 @@ public class DataSourcesResource
|
||||||
int numChangedSegments = operation.perform();
|
int numChangedSegments = operation.perform();
|
||||||
return Response.ok(ImmutableMap.of("numChangedSegments", numChangedSegments)).build();
|
return Response.ok(ImmutableMap.of("numChangedSegments", numChangedSegments)).build();
|
||||||
}
|
}
|
||||||
catch (UnknownSegmentIdsException e) {
|
catch (DruidException e) {
|
||||||
log.warn("Could not find segmentIds[%s]", e.getUnknownSegmentIds());
|
|
||||||
return Response
|
return Response
|
||||||
.status(Response.Status.NOT_FOUND)
|
.status(e.getStatusCode())
|
||||||
.entity(ImmutableMap.of("message", e.getMessage()))
|
.entity(new ErrorResponse(e))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import org.apache.druid.client.DataSourcesSnapshot;
|
import org.apache.druid.client.DataSourcesSnapshot;
|
||||||
import org.apache.druid.client.ImmutableDruidDataSource;
|
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||||
|
import org.apache.druid.error.DruidException;
|
||||||
|
import org.apache.druid.error.DruidExceptionMatcher;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
@ -38,6 +40,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.timeline.partition.NoneShardSpec;
|
import org.apache.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.hamcrest.MatcherAssert;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -602,9 +605,14 @@ public class SqlSegmentsMetadataManagerTest
|
||||||
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
|
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertThrows(
|
MatcherAssert.assertThat(
|
||||||
UnknownSegmentIdsException.class,
|
Assert.assertThrows(
|
||||||
() -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", segmentIds)
|
DruidException.class,
|
||||||
|
() -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", segmentIds)
|
||||||
|
),
|
||||||
|
DruidExceptionMatcher
|
||||||
|
.invalidInput()
|
||||||
|
.expectMessageContains("Could not find segment IDs")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -627,9 +635,14 @@ public class SqlSegmentsMetadataManagerTest
|
||||||
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
|
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertThrows(
|
MatcherAssert.assertThat(
|
||||||
UnknownSegmentIdsException.class,
|
Assert.assertThrows(
|
||||||
() -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, segmentIds)
|
DruidException.class,
|
||||||
|
() -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, segmentIds)
|
||||||
|
),
|
||||||
|
DruidExceptionMatcher
|
||||||
|
.invalidInput()
|
||||||
|
.expectMessageContains("Could not find segment IDs")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,8 @@ public class DataSegmentPlusTest
|
||||||
1
|
1
|
||||||
),
|
),
|
||||||
createdDate,
|
createdDate,
|
||||||
usedStatusLastUpdatedDate
|
usedStatusLastUpdatedDate,
|
||||||
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
final Map<String, Object> objectMap = MAPPER.readValue(
|
final Map<String, Object> objectMap = MAPPER.readValue(
|
||||||
|
@ -105,7 +106,7 @@ public class DataSegmentPlusTest
|
||||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals(3, objectMap.size());
|
Assert.assertEquals(4, objectMap.size());
|
||||||
final Map<String, Object> segmentObjectMap = MAPPER.readValue(
|
final Map<String, Object> segmentObjectMap = MAPPER.readValue(
|
||||||
MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
|
MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
|
||||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.druid.error.DruidExceptionMatcher;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.metadata.MetadataRuleManager;
|
import org.apache.druid.metadata.MetadataRuleManager;
|
||||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||||
import org.apache.druid.metadata.UnknownSegmentIdsException;
|
|
||||||
import org.apache.druid.query.SegmentDescriptor;
|
import org.apache.druid.query.SegmentDescriptor;
|
||||||
import org.apache.druid.query.TableDataSource;
|
import org.apache.druid.query.TableDataSource;
|
||||||
import org.apache.druid.rpc.indexing.OverlordClient;
|
import org.apache.druid.rpc.indexing.OverlordClient;
|
||||||
|
@ -775,7 +774,7 @@ public class DataSourcesResourceTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMarkAsUsedNonOvershadowedSegmentsSet() throws UnknownSegmentIdsException
|
public void testMarkAsUsedNonOvershadowedSegmentsSet()
|
||||||
{
|
{
|
||||||
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
|
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
Set<String> segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString());
|
Set<String> segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString());
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class MetadataResourceTest
|
||||||
.toArray(new DataSegment[0]);
|
.toArray(new DataSegment[0]);
|
||||||
|
|
||||||
private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
|
private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
|
||||||
.map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc()))
|
.map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc(), null))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
private HttpServletRequest request;
|
private HttpServletRequest request;
|
||||||
private SegmentsMetadataManager segmentsMetadataManager;
|
private SegmentsMetadataManager segmentsMetadataManager;
|
||||||
|
|
Loading…
Reference in New Issue