Consolidate a bunch of ad-hoc segments metadata SQL; fix some bugs. (#11582)

* Consolidate a bunch of ad-hoc segments metadata SQL; fix some bugs.

This patch gathers together a variety of SQL from SqlSegmentsMetadataManager
and IndexerSQLMetadataStorageCoordinator into a new class SqlSegmentsMetadataQuery.
It focuses on SQL related to retrieving segment payloads and marking
segments used and unused.

In addition to cleaning up the code a bit, this patch also fixes a bug
with years before 0 or after 9999. The prior SQL did not work properly
because dates outside this range cannot be compared as strings. The new
code does work for these far-past and far-future years.

So, if you're ever interested in using Druid to analyze things from
ancient Babylon, you better apply this patch first!

* Fix test compiling.

* Fixes and improvements.

* Fix forbidden API.

* Additional fixes.
This commit is contained in:
Gian Merlino 2021-11-24 14:51:53 -08:00 committed by GitHub
parent 12e2228510
commit 3d72e66f56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 771 additions and 376 deletions

View File

@ -24,6 +24,7 @@ import io.netty.util.SuppressForbidden;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Months;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
@ -38,6 +39,8 @@ public final class DateTimes
public static final DateTime EPOCH = utc(0);
public static final DateTime MAX = utc(JodaUtils.MAX_INSTANT);
public static final DateTime MIN = utc(JodaUtils.MIN_INSTANT);
public static final DateTime CAN_COMPARE_AS_YEAR_MIN = of("0000-01-01");
public static final DateTime CAN_COMPARE_AS_YEAR_MAX = of("10000-01-01").minus(1);
public static final UtcFormatter ISO_DATE_TIME = wrapFormatter(ISODateTimeFormat.dateTime());
public static final UtcFormatter ISO_DATE_OPTIONAL_TIME = wrapFormatter(ISODateTimeFormat.dateOptionalTimeParser());
@ -65,6 +68,7 @@ public final class DateTimes
/**
* @return The dateTimezone for the provided {@param tzId}. If {@param fallback} is true, the default timezone
* will be returned if the tzId does not match a supported timezone.
*
* @throws IllegalArgumentException if {@param fallback} is false and the provided tzId doesn't match a supported
* timezone.
*/
@ -173,6 +177,22 @@ public final class DateTimes
return Months.monthsBetween(time1, time2).getMonths();
}
/**
* Returns true if the provided DateTime can be compared against other DateTimes using its string representation.
* Useful when generating SQL queries to the metadata store, or any other situation where time comparisons on
* string representations might be useful.
*
* Conditions: the datetime must be between years 0 and 9999 (inclusive) and must be in the ISO UTC chronology.
*
* See also {@link Intervals#canCompareEndpointsAsStrings(Interval)}.
*/
public static boolean canCompareAsString(final DateTime dateTime)
{
return dateTime.getMillis() >= CAN_COMPARE_AS_YEAR_MIN.getMillis()
&& dateTime.getMillis() <= CAN_COMPARE_AS_YEAR_MAX.getMillis()
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
}
private DateTimes()
{
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
@ -48,6 +49,25 @@ public final class Intervals
return interval.getStart().equals(interval.getEnd());
}
/**
* Returns true if the provided interval has endpoints that can be compared against other DateTimes using their
* string representations.
*
* See also {@link DateTimes#canCompareAsString(DateTime)}.
*/
public static boolean canCompareEndpointsAsStrings(final Interval interval)
{
return DateTimes.canCompareAsString(interval.getStart()) && DateTimes.canCompareAsString(interval.getEnd());
}
/**
* Returns true if the provided interval contains all time.
*/
public static boolean isEternity(final Interval interval)
{
return ETERNITY.equals(interval);
}
private Intervals()
{
}

View File

@ -100,4 +100,27 @@ public class DateTimesTest
String invalid = "51729200AZ";
DateTimes.of(invalid);
}
@Test
public void testCanCompareAsString()
{
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.EPOCH));
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("0000-01-01")));
Assert.assertEquals("0000-01-01T00:00:00.000Z", DateTimes.CAN_COMPARE_AS_YEAR_MIN.toString());
Assert.assertEquals("9999-12-31T23:59:59.999Z", DateTimes.CAN_COMPARE_AS_YEAR_MAX.toString());
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("9999")));
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("2000")));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MIN));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.MAX));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("-1-01-01T00:00:00")));
Assert.assertFalse(DateTimes.canCompareAsString(DateTimes.of("10000-01-01")));
// Can't compare as string with mixed time zones.
Assert.assertFalse(DateTimes.canCompareAsString(
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
);
}
}

View File

@ -405,7 +405,7 @@ public class MaterializedViewSupervisor implements Supervisor
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId().toString());
sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId());
}
}
// data of the latest interval will be built firstly.

View File

@ -74,7 +74,7 @@ public class RetrieveSegmentsActionsTest
expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId().toString()));
expectedUnusedSegments.forEach(s -> actionTestKit.getSegmentsMetadataManager().markSegmentAsUnused(s.getId()));
}
private DataSegment createSegment(Interval interval, String version)

View File

@ -64,12 +64,12 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId()
)
);
Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId().toString()
newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId()
)
);
@ -114,7 +114,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId()
)
);

View File

@ -27,7 +27,6 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
@ -46,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@ -60,8 +60,6 @@ import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
@ -90,6 +88,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
*
*/
public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{
@ -193,58 +192,27 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
// 2 range conditions are used on different columns, but not all SQL databases properly optimize it.
// Some databases can only use an index on one of the columns. An additional condition provides
// explicit knowledge that 'start' cannot be greater than 'end'.
return handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start "
+ "and start <= :end and %2$send%2$s <= :end and used = false",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.fold(
new ArrayList<>(),
(Folder3<List<DataSegment>, byte[]>) (accumulator, payload, foldController, statementContext) -> {
accumulator.add(JacksonUtils.readValue(jsonMapper, payload, DataSegment.class));
return accumulator;
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval))) {
return ImmutableList.copyOf(iterator);
}
);
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
log.info("Found %,d unused segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}
@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
{
int numSegmentsMarkedUnused = connector.retryTransaction(
(handle, status) -> {
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=false WHERE dataSource = :dataSource "
+ "AND start >= :start AND %2$send%2$s <= :end",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.execute();
},
final Integer numSegmentsMarkedUnused = connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.markSegmentsUnused(dataSource, interval),
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
@ -292,14 +260,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
) throws IOException
{
Query<Map<String, Object>> sql = createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals);
try (final ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) {
return VersionedIntervalTimeline.forSegments(
Iterators.transform(dbSegments, payload -> JacksonUtils.readValue(jsonMapper, payload, DataSegment.class))
);
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
return VersionedIntervalTimeline.forSegments(iterator);
}
}
@ -307,50 +273,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
) throws IOException
{
return createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals)
.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes("payload"), DataSegment.class))
.list();
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
final List<DataSegment> retVal = new ArrayList<>();
iterator.forEachRemaining(retVal::add);
return retVal;
}
/**
* Creates a query to the metadata store which selects payload from the segments table for all segments which are
* marked as used and whose interval intersects (not just abuts) with any of the intervals given to this method.
*/
private Query<Map<String, Object>> createUsedSegmentsSqlQueryForIntervals(
Handle handle,
String dataSource,
List<Interval> intervals
)
{
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ?");
if (!intervals.isEmpty()) {
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
StringUtils.format("(start < ? AND %1$send%1$s > ?)", connector.getQuoteString())
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
}
Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.bind(0, dataSource);
for (int i = 0; i < intervals.size(); i++) {
Interval interval = intervals.get(i);
sql = sql
.bind(2 * i + 1, interval.getEnd().toString())
.bind(2 * i + 2, interval.getStart().toString());
}
return sql;
}
@Override
@ -1312,7 +1243,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
*/
protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
final Handle handle,
final Set<DataSegment> segmentsToDrop,
final Collection<DataSegment> segmentsToDrop,
final String dataSource
)
{
@ -1331,18 +1262,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
return DataStoreMetadataUpdateResult.FAILURE;
}
final List<String> segmentIdList = segmentsToDrop.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList());
Batch batch = handle.createBatch();
segmentIdList.forEach(segmentId -> batch.add(
StringUtils.format(
"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'",
dbTables.getSegmentsTable(),
dataSource,
segmentId
)
));
final int[] segmentChanges = batch.execute();
int numChangedSegments = SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIdList, segmentChanges);
final int numChangedSegments =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper).markSegments(
segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()),
false
);
if (numChangedSegments != segmentsToDrop.size()) {
log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]",
numChangedSegments,

View File

@ -73,14 +73,14 @@ public interface SegmentsMetadataManager
int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);
int markSegmentsAsUnused(String dataSource, Set<String> segmentIds);
int markSegmentsAsUnused(Set<SegmentId> segmentIds);
/**
* Returns true if the state of the segment entry is changed in the database as the result of this call (that is, the
* segment was marked as unused), false otherwise. If the call results in a database error, an exception is relayed to
* the caller.
*/
boolean markSegmentAsUnused(String segmentId);
boolean markSegmentAsUnused(SegmentId segmentId);
/**
* If there are used segments belonging to the given data source this method returns them as an {@link

View File

@ -36,14 +36,15 @@ import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
@ -54,10 +55,8 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.BaseResultSetMapper;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
@ -69,6 +68,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -81,7 +81,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
*
@ -558,70 +557,51 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
*/
private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval)
{
List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>();
List<DataSegment> unusedSegmentsInInterval = new ArrayList<>();
final List<DataSegment> unusedSegments = new ArrayList<>();
final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(Collections.emptyList());
connector.inReadOnlyTransaction(
(handle, status) -> {
String queryString =
StringUtils.format("SELECT used, payload FROM %1$s WHERE dataSource = :dataSource", getSegmentsTable());
if (interval != null) {
queryString += StringUtils.format(" AND start < :end AND %1$send%1$s > :start", connector.getQuoteString());
}
Query<?> query = handle
.createQuery(queryString)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSourceName);
if (interval != null) {
query = query
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
}
query = query
.map((int index, ResultSet resultSet, StatementContext context) -> {
DataSegment segment =
JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), DataSegment.class);
if (resultSet.getBoolean("used")) {
usedSegmentsOverlappingInterval.add(segment);
} else {
if (interval == null || interval.contains(segment.getInterval())) {
unusedSegmentsInInterval.add(segment);
}
}
//noinspection ReturnOfNull: intentional, consume() call below doesn't use the results.
return null;
});
// Consume the query results to ensure usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
// populated.
consume(query.iterator());
return null;
}
);
final SqlSegmentsMetadataQuery queryTool =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper);
VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(
Iterators.concat(usedSegmentsOverlappingInterval.iterator(), unusedSegmentsInInterval.iterator())
);
final List<Interval> intervals =
interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval);
return markNonOvershadowedSegmentsAsUsed(unusedSegmentsInInterval, versionedIntervalTimeline);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUsedSegments(dataSourceName, intervals)) {
VersionedIntervalTimeline.addSegments(timeline, iterator);
}
private static void consume(Iterator<?> iterator)
{
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals)) {
while (iterator.hasNext()) {
iterator.next();
final DataSegment dataSegment = iterator.next();
VersionedIntervalTimeline.addSegments(timeline, Iterators.singletonIterator(dataSegment));
unusedSegments.add(dataSegment);
}
}
//noinspection ReturnOfNull: This consumer operates by side effects
return null;
}
);
return markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline);
}
private int markNonOvershadowedSegmentsAsUsed(
List<DataSegment> unusedSegments,
VersionedIntervalTimeline<String, DataSegment> timeline
)
{
List<String> segmentIdsToMarkAsUsed = new ArrayList<>();
List<SegmentId> segmentIdsToMarkAsUsed = new ArrayList<>();
for (DataSegment segment : unusedSegments) {
if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) {
continue;
}
segmentIdsToMarkAsUsed.add(segment.getId().toString());
segmentIdsToMarkAsUsed.add(segment.getId());
}
return markSegmentsAsUsed(segmentIdsToMarkAsUsed);
@ -639,13 +619,14 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
List<Interval> unusedSegmentsIntervals = JodaUtils.condenseIntervals(
unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
Iterator<DataSegment> usedSegmentsOverlappingUnusedSegmentsIntervals =
retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle);
try (CloseableIterator<DataSegment> usedSegmentsOverlappingUnusedSegmentsIntervals =
retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle)) {
VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(
Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator())
);
return new Pair<>(unusedSegments, timeline);
}
}
);
List<DataSegment> unusedSegments = unusedSegmentsAndTimeline.lhs;
@ -720,68 +701,39 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
return segments;
}
private Iterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(
private CloseableIterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(
final String dataSource,
final Collection<Interval> intervals,
final Handle handle
)
{
return intervals
.stream()
.flatMap(interval -> {
Iterable<DataSegment> segmentResultIterable = () -> handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s "
+ "WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true",
getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map((int index, ResultSet resultSet, StatementContext context) ->
JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), DataSegment.class)
)
.iterator();
return StreamSupport.stream(segmentResultIterable.spliterator(), false);
})
.iterator();
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.retrieveUsedSegments(dataSource, intervals);
}
private int markSegmentsAsUsed(final List<String> segmentIds)
private int markSegmentsAsUsed(final List<SegmentId> segmentIds)
{
if (segmentIds.isEmpty()) {
log.info("No segments found to update!");
return 0;
}
return connector.getDBI().withHandle(handle -> {
Batch batch = handle.createBatch();
segmentIds.forEach(segmentId -> batch.add(
StringUtils.format("UPDATE %s SET used=true WHERE id = '%s'", getSegmentsTable(), segmentId)
));
int[] segmentChanges = batch.execute();
return computeNumChangedSegments(segmentIds, segmentChanges);
});
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegments(segmentIds, true)
);
}
@Override
public int markAsUnusedAllSegmentsInDataSource(final String dataSource)
{
try {
final int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
(Handle handle) -> handle
.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
)
.bind("dataSource", dataSource)
.execute()
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegmentsUnused(dataSource, Intervals.ETERNITY)
);
return numUpdatedDatabaseEntries;
}
catch (RuntimeException e) {
log.error(e, "Exception marking all segments as unused in data source [%s]", dataSource);
@ -794,10 +746,16 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* snapshot update. The update of the segment's state will be reflected after the next {@link DatabasePoll}.
*/
@Override
public boolean markSegmentAsUnused(final String segmentId)
public boolean markSegmentAsUnused(final SegmentId segmentId)
{
try {
return markSegmentAsUnusedInDatabase(segmentId);
final int numSegments = connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegments(Collections.singletonList(segmentId), false)
);
return numSegments > 0;
}
catch (RuntimeException e) {
log.error(e, "Exception marking segment [%s] as unused", segmentId);
@ -806,109 +764,30 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
}
@Override
public int markSegmentsAsUnused(String dataSourceName, Set<String> segmentIds)
public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
{
if (segmentIds.isEmpty()) {
return 0;
}
final List<String> segmentIdList = new ArrayList<>(segmentIds);
try {
return connector.getDBI().withHandle(handle -> {
Batch batch = handle.createBatch();
segmentIdList.forEach(segmentId -> batch.add(
StringUtils.format(
"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'",
getSegmentsTable(),
dataSourceName,
segmentId
)
));
final int[] segmentChanges = batch.execute();
return computeNumChangedSegments(segmentIdList, segmentChanges);
});
}
catch (Exception e) {
throw new RuntimeException(e);
}
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegments(segmentIds, false)
);
}
@Override
public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval)
{
try {
Integer numUpdatedDatabaseEntries = connector.getDBI().withHandle(
handle -> handle
.createStatement(
StringUtils
.format(
"UPDATE %s SET used=false WHERE datasource = :datasource "
+ "AND start >= :start AND %2$send%2$s <= :end",
getSegmentsTable(),
connector.getQuoteString()
))
.bind("datasource", dataSourceName)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.execute()
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegmentsUnused(dataSourceName, interval)
);
return numUpdatedDatabaseEntries;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private boolean markSegmentAsUnusedInDatabase(String segmentId)
{
final int numUpdatedRows = connector.getDBI().withHandle(
handle -> handle
.createStatement(StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable()))
.bind("segmentID", segmentId)
.execute()
);
if (numUpdatedRows < 0) {
log.assertionError(
"Negative number of rows updated for segment id [%s]: %d",
segmentId,
numUpdatedRows
);
} else if (numUpdatedRows > 1) {
log.error(
"More than one row updated for segment id [%s]: %d, "
+ "there may be more than one row for the segment id in the database",
segmentId,
numUpdatedRows
);
}
return numUpdatedRows > 0;
}
static int computeNumChangedSegments(List<String> segmentIds, int[] segmentChanges)
{
int numChangedSegments = 0;
for (int i = 0; i < segmentChanges.length; i++) {
int numUpdatedRows = segmentChanges[i];
if (numUpdatedRows < 0) {
log.assertionError(
"Negative number of rows updated for segment id [%s]: %d",
segmentIds.get(i),
numUpdatedRows
);
} else if (numUpdatedRows > 1) {
log.error(
"More than one row updated for segment id [%s]: %d, "
+ "there may be more than one row for the segment id in the database",
segmentIds.get(i),
numUpdatedRows
);
}
if (numUpdatedRows > 0) {
numChangedSegments += 1;
}
}
return numChangedSegments;
}
@Override
public @Nullable ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSourceName)
{

View File

@ -0,0 +1,359 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* An object that helps {@link SqlSegmentsMetadataManager} and {@link IndexerSQLMetadataStorageCoordinator} make
* queries to the metadata store segments table. Each instance of this class is scoped to a single handle and is meant
* to be short-lived.
*/
public class SqlSegmentsMetadataQuery
{
private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);
private final Handle handle;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig dbTables;
private final ObjectMapper jsonMapper;
private SqlSegmentsMetadataQuery(
final Handle handle,
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig dbTables,
final ObjectMapper jsonMapper
)
{
this.handle = handle;
this.connector = connector;
this.dbTables = dbTables;
this.jsonMapper = jsonMapper;
}
/**
* Create a query object. This instance is scoped to a single handle and is meant to be short-lived. It is okay
* to use it for more than one query, though.
*/
public static SqlSegmentsMetadataQuery forHandle(
final Handle handle,
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig dbTables,
final ObjectMapper jsonMapper
)
{
return new SqlSegmentsMetadataQuery(handle, connector, dbTables, jsonMapper);
}
/**
* Retrieves segments for a given datasource that are marked used (i.e. published) in the metadata store, and that
* *overlap* any interval in a particular collection of intervals. If the collection of intervals is empty, this
* method will retrieve all used segments.
*
* You cannot assume that segments returned by this call are actually active. Because there is some delay between
* new segment publishing and the marking-unused of older segments, it is possible that some segments returned
* by this call are overshadowed by other segments. To check for this, use
* {@link org.apache.druid.timeline.VersionedIntervalTimeline#forSegments(Iterator)}.
*
* This call does not return any information about realtime segments.
*
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true);
}
/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval
* in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all
* unused segments.
*
* This call does not return any information about realtime segments.
*
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false);
}
/**
* Marks the provided segments as either used or unused.
*
* Returns the number of segments actually modified.
*/
public int markSegments(final Collection<SegmentId> segmentIds, final boolean used)
{
final String dataSource;
if (segmentIds.isEmpty()) {
return 0;
} else {
dataSource = segmentIds.iterator().next().getDataSource();
if (segmentIds.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) {
throw new IAE("Segments to drop must all be part of the same datasource");
}
}
final PreparedBatch batch =
handle.prepareBatch(
StringUtils.format(
"UPDATE %s SET used = ? WHERE datasource = ? AND id = ?",
dbTables.getSegmentsTable()
)
);
for (SegmentId segmentId : segmentIds) {
batch.add(used, dataSource, segmentId.toString());
}
final int[] segmentChanges = batch.execute();
return computeNumChangedSegments(
segmentIds.stream().map(SegmentId::toString).collect(Collectors.toList()),
segmentChanges
);
}
/**
* Marks all segments for a datasource unused that are *fully contained by* a particular interval.
*
* Returns the number of segments actually modified.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval)
{
if (Intervals.isEternity(interval)) {
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used WHERE dataSource = :dataSource",
dbTables.getSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("used", false)
.execute();
} else if (Intervals.canCompareEndpointsAsStrings(interval)
&& interval.getStart().getYear() == interval.getEnd().getYear()) {
// Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because
// that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a
// segment interval like "20001/20002".)
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used WHERE dataSource = :dataSource AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
)
.bind("dataSource", dataSource)
.bind("used", false)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.execute();
} else {
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true),
DataSegment::getId
)
);
return markSegments(segments, false);
}
}
private CloseableIterator<DataSegment> retrieveSegments(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource");
if (compareAsString && !intervals.isEmpty()) {
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
matchMode.makeSqlCondition(
connector.getQuoteString(),
StringUtils.format(":start%d", i),
StringUtils.format(":end%d", i)
)
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
}
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
if (compareAsString) {
final Iterator<Interval> iterator = intervals.iterator();
for (int i = 0; iterator.hasNext(); i++) {
Interval interval = iterator.next();
sql.bind(StringUtils.format("start%d", i), interval.getStart().toString())
.bind(StringUtils.format("end%d", i), interval.getEnd().toString());
}
}
final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
return CloseableIterators.wrap(
Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
}
}
return false;
}
}
),
resultIterator
);
}
private static int computeNumChangedSegments(List<String> segmentIds, int[] segmentChanges)
{
int numChangedSegments = 0;
for (int i = 0; i < segmentChanges.length; i++) {
int numUpdatedRows = segmentChanges[i];
if (numUpdatedRows < 0) {
log.assertionError(
"Negative number of rows updated for segment id [%s]: %d",
segmentIds.get(i),
numUpdatedRows
);
} else if (numUpdatedRows > 1) {
log.error(
"More than one row updated for segment id [%s]: %d, "
+ "there may be more than one row for the segment id in the database",
segmentIds.get(i),
numUpdatedRows
);
}
if (numUpdatedRows > 0) {
numChangedSegments += 1;
}
}
return numChangedSegments;
}
enum IntervalMode
{
CONTAINS {
@Override
public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder)
{
// 2 range conditions are used on different columns, but not all SQL databases properly optimize it.
// Some databases can only use an index on one of the columns. An additional condition provides
// explicit knowledge that 'start' cannot be greater than 'end'.
return StringUtils.format(
"(start >= %2$s and start <= %3$s and %1$send%1$s <= %3$s)",
quoteString,
startPlaceholder,
endPlaceholder
);
}
@Override
public boolean apply(Interval a, Interval b)
{
return a.contains(b);
}
},
OVERLAPS {
@Override
public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder)
{
return StringUtils.format(
"(start < %3$s AND %1$send%1$s > %2$s)",
quoteString,
startPlaceholder,
endPlaceholder
);
}
@Override
public boolean apply(Interval a, Interval b)
{
return a.overlaps(b);
}
};
public abstract String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder);
public abstract boolean apply(Interval a, Interval b);
}
}

View File

@ -420,7 +420,7 @@ public class DruidCoordinator
public void markSegmentAsUnused(DataSegment segment)
{
log.debug("Marking segment[%s] as unused", segment.getId());
segmentsMetadataManager.markSegmentAsUnused(segment.getId().toString());
segmentsMetadataManager.markSegmentAsUnused(segment.getId());
}
public String getCurrentLeader()

View File

@ -224,8 +224,19 @@ public class DataSourcesResource
if (interval != null) {
return segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
} else {
final Set<String> segmentIds = payload.getSegmentIds();
return segmentsMetadataManager.markSegmentsAsUnused(dataSourceName, segmentIds);
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
.stream()
.map(idStr -> SegmentId.tryParse(dataSourceName, idStr))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// Note: segments for the "wrong" datasource are ignored.
return segmentsMetadataManager.markSegmentsAsUnused(
segmentIds.stream()
.filter(segmentId -> segmentId.getDataSource().equals(dataSourceName))
.collect(Collectors.toSet())
);
}
};
return doMarkSegmentsWithPayload("markSegmentsAsUnused", dataSourceName, payload, markSegments);
@ -633,10 +644,11 @@ public class DataSourcesResource
@ResourceFilters(DatasourceResourceFilter.class)
public Response markSegmentAsUnused(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
@PathParam("segmentId") String segmentIdString
)
{
boolean segmentStateChanged = segmentsMetadataManager.markSegmentAsUnused(segmentId);
final SegmentId segmentId = SegmentId.tryParse(dataSourceName, segmentIdString);
final boolean segmentStateChanged = segmentId != null && segmentsMetadataManager.markSegmentAsUnused(segmentId);
return Response.ok(ImmutableMap.of("segmentStateChanged", segmentStateChanged)).build();
}

View File

@ -218,6 +218,42 @@ public class IndexerSQLMetadataStorageCoordinatorTest
100
);
private final DataSegment hugeTimeRangeSegment1 = new DataSegment(
"hugeTimeRangeDataSource",
Intervals.of("-9994-01-02T00Z/1994-01-03T00Z"),
"zversion",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NumberedShardSpec(0, 1),
9,
100
);
private final DataSegment hugeTimeRangeSegment2 = new DataSegment(
"hugeTimeRangeDataSource",
Intervals.of("2994-01-02T00Z/2994-01-03T00Z"),
"zversion",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NumberedShardSpec(0, 1),
9,
100
);
private final DataSegment hugeTimeRangeSegment3 = new DataSegment(
"hugeTimeRangeDataSource",
Intervals.of("29940-01-02T00Z/29940-01-03T00Z"),
"zversion",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NumberedShardSpec(0, 1),
9,
100
);
private final Set<DataSegment> SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2);
private final AtomicLong metadataUpdateCounter = new AtomicLong();
private final AtomicLong segmentTableDropUpdateCounter = new AtomicLong();
@ -258,7 +294,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Override
protected DataStoreMetadataUpdateResult dropSegmentsWithHandle(
final Handle handle,
final Set<DataSegment> segmentsToDrop,
final Collection<DataSegment> segmentsToDrop,
final String dataSource
)
{
@ -748,7 +784,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null,
null
);
Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1);
Assert.assertEquals(SegmentPublishResult.fail(
"org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1);
Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get());
@ -1096,6 +1133,75 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
@Test
public void testUsedHugeTimeRangeEternityFilter() throws IOException
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(
hugeTimeRangeSegment1,
hugeTimeRangeSegment2,
hugeTimeRangeSegment3
)
);
Assert.assertEquals(
ImmutableSet.of(hugeTimeRangeSegment1, hugeTimeRangeSegment2, hugeTimeRangeSegment3),
ImmutableSet.copyOf(
coordinator.retrieveUsedSegmentsForIntervals(
hugeTimeRangeSegment1.getDataSource(),
Intervals.ONLY_ETERNITY,
Segments.ONLY_VISIBLE
)
)
);
}
@Test
public void testUsedHugeTimeRangeTrickyFilter1() throws IOException
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(
hugeTimeRangeSegment1,
hugeTimeRangeSegment2,
hugeTimeRangeSegment3
)
);
Assert.assertEquals(
ImmutableSet.of(hugeTimeRangeSegment2),
ImmutableSet.copyOf(
coordinator.retrieveUsedSegmentsForInterval(
hugeTimeRangeSegment1.getDataSource(),
Intervals.of("2900/10000"),
Segments.ONLY_VISIBLE
)
)
);
}
@Test
public void testUsedHugeTimeRangeTrickyFilter2() throws IOException
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(
hugeTimeRangeSegment1,
hugeTimeRangeSegment2,
hugeTimeRangeSegment3
)
);
Assert.assertEquals(
ImmutableSet.of(hugeTimeRangeSegment2),
ImmutableSet.copyOf(
coordinator.retrieveUsedSegmentsForInterval(
hugeTimeRangeSegment1.getDataSource(),
Intervals.of("2993/2995"),
Segments.ONLY_VISIBLE
)
)
);
}
@Test
public void testDeleteDataSourceMetadata() throws IOException
{
@ -1648,7 +1754,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
prevSegmentId = identifier.toString();
}
final int numDeleted = coordinator.deletePendingSegmentsCreatedInInterval(dataSource, new Interval(begin, secondBegin));
final int numDeleted = coordinator.deletePendingSegmentsCreatedInInterval(
dataSource,
new Interval(begin, secondBegin)
);
Assert.assertEquals(10, numDeleted);
}
@ -1904,7 +2013,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(defaultSegment.getId().toString(), usedSegments.get(0));
// Try drop segment
IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource());
IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(
handle,
ImmutableSet.of(defaultSegment),
defaultSegment.getDataSource()
);
Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.SUCCESS, result);
usedSegments = retrieveUsedSegmentIds();
@ -1917,7 +2030,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
try (Handle handle = derbyConnector.getDBI().open()) {
// Try drop segment
IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource());
IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(
handle,
ImmutableSet.of(defaultSegment),
defaultSegment.getDataSource()
);
Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result);
}
}
@ -1938,7 +2055,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
// Try delete. Datasource should not be deleted as it is in excluded set
int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of("fooDataSource"));
int deletedCount = coordinator.removeDataSourceMetadataOlderThan(
System.currentTimeMillis(),
ImmutableSet.of("fooDataSource")
);
// Datasource should not be deleted
Assert.assertEquals(
@ -1974,7 +2094,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
@Test
public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception
public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted()
throws Exception
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
@ -1990,7 +2111,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Do delete. Datasource metadata should not be deleted. Datasource is not active but it was created just now so it's
// created timestamp will be later than the timestamp 2012-01-01T00:00:00Z
int deletedCount = coordinator.removeDataSourceMetadataOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis(), ImmutableSet.of());
int deletedCount = coordinator.removeDataSourceMetadataOlderThan(
DateTimes.of("2012-01-01T00:00:00Z").getMillis(),
ImmutableSet.of()
);
// Datasource should not be deleted
Assert.assertEquals(
@ -2001,7 +2125,39 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
@Test
public void testMarkSegmentsAsUnusedWithinInterval() throws IOException
public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException
{
coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2));
// interval covers existingSegment1 and partially overlaps existingSegment2,
// only existingSegment1 will be dropped
coordinator.markSegmentsAsUnusedWithinInterval(
existingSegment1.getDataSource(),
Intervals.of("1994-01-01/1994-01-02T12Z")
);
Assert.assertEquals(
ImmutableSet.of(existingSegment1),
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment1.getDataSource(),
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
)
)
);
Assert.assertEquals(
ImmutableSet.of(),
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment2.getDataSource(),
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
)
)
);
}
@Test
public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException
{
coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2));

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -443,7 +444,7 @@ public class SqlSegmentsMetadataManagerTest
awaitDataSourceAppeared(newDataSource);
Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId().toString()));
Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId()));
awaitDataSourceDisappeared(newDataSource);
Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource));
}
@ -706,10 +707,10 @@ public class SqlSegmentsMetadataManagerTest
publisher.publishSegment(newSegment1);
publisher.publishSegment(newSegment2);
final ImmutableSet<String> segmentIds =
ImmutableSet.of(newSegment1.getId().toString(), newSegment1.getId().toString());
final ImmutableSet<SegmentId> segmentIds =
ImmutableSet.of(newSegment1.getId(), newSegment1.getId());
Assert.assertEquals(segmentIds.size(), sqlSegmentsMetadataManager.markSegmentsAsUnused(newDataSource, segmentIds));
Assert.assertEquals(segmentIds.size(), sqlSegmentsMetadataManager.markSegmentsAsUnused(segmentIds));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
@ -717,31 +718,6 @@ public class SqlSegmentsMetadataManagerTest
);
}
@Test
public void testMarkSegmentsAsUnusedInvalidDataSource() throws IOException
{
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final String newDataSource = "wikipedia2";
final DataSegment newSegment1 = createNewSegment1(newDataSource);
final DataSegment newSegment2 = createNewSegment1(newDataSource);
publisher.publishSegment(newSegment1);
publisher.publishSegment(newSegment2);
final ImmutableSet<String> segmentIds =
ImmutableSet.of(newSegment1.getId().toString(), newSegment2.getId().toString());
// none of the segments are in data source
Assert.assertEquals(0, sqlSegmentsMetadataManager.markSegmentsAsUnused("wrongDataSource", segmentIds));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUnusedSegmentsInInterval() throws IOException
{

View File

@ -100,7 +100,7 @@ public class DataSourcesResourceTest
new DataSegment(
"datasource1",
Intervals.of("2010-01-01/P1D"),
"",
"v0",
null,
null,
null,
@ -113,7 +113,7 @@ public class DataSourcesResourceTest
new DataSegment(
"datasource1",
Intervals.of("2010-01-22/P1D"),
"",
"v0",
null,
null,
null,
@ -126,7 +126,7 @@ public class DataSourcesResourceTest
new DataSegment(
"datasource2",
Intervals.of("2010-01-01/P1D"),
"",
"v0",
null,
null,
null,
@ -1018,16 +1018,24 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnused()
{
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
final Set<String> segmentIds =
dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet());
final Set<SegmentId> segmentIds =
dataSegmentList.stream()
.filter(segment -> segment.getDataSource().equals(dataSource1.getName()))
.map(DataSegment::getId)
.collect(Collectors.toSet());
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds)).andReturn(1).once();
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
new DataSourcesResource.MarkDataSourceSegmentsPayload(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
@ -1041,16 +1049,24 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedNoChanges()
{
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
final Set<String> segmentIds =
dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet());
final Set<SegmentId> segmentIds =
dataSegmentList.stream()
.filter(segment -> segment.getDataSource().equals(dataSource1.getName()))
.map(DataSegment::getId)
.collect(Collectors.toSet());
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds)).andReturn(0).once();
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
new DataSourcesResource.MarkDataSourceSegmentsPayload(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
@ -1064,18 +1080,26 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedException()
{
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
final Set<String> segmentIds =
dataSegmentList.stream().map(ds -> ds.getId().toString()).collect(Collectors.toSet());
final Set<SegmentId> segmentIds =
dataSegmentList.stream()
.filter(segment -> segment.getDataSource().equals(dataSource1.getName()))
.map(DataSegment::getId)
.collect(Collectors.toSet());
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused("datasource1", segmentIds))
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds))
.andThrow(new RuntimeException("Exception occurred"))
.once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
new DataSourcesResource.MarkDataSourceSegmentsPayload(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);