Fix queries for updated segments on SinkQuerySegmentWalker (#17157)

Fix the logic for usage of segment descriptors from queries in SinkQuerySegmentWalker when there are upgraded segments as a result of concurrent replace.

Concurrent append and replace:
With the introduction of concurrent append and replace, for a given interval:

The same sink can correspond to a base segment V0_x0, and have multiple mappings to higher versions with distinct partition numbers such as V1_x1.... Vn_xn.
The initial segment allocation can happen on version V0, but there can be several allocations during the lifecycle of a task which can have different versions spanning from V0 to Vn.
Changes:
Maintain a new timeline of (An overshadowable holding a SegmentDescriptor)
Every segment allocation of version upgrade adds the latest segment descriptor to this timeline.
Iterate this timeline instead of the sinkTimeline to get the segment descriptors in getQueryRunnerForIntervals
Also maintain a mapping of the upgraded segment to its base segment.
When a sink is needed to process the query, find the base segment corresponding to a given descriptor, and then use the sinkTimeline to find its chunk.
This commit is contained in:
AmatyaAvadhanula 2024-10-09 14:43:17 +05:30 committed by GitHub
parent a395368622
commit 88d26e4541
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 863 additions and 77 deletions

View File

@ -61,8 +61,10 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.sink.Sink;
import org.apache.druid.segment.realtime.sink.SinkSegmentReference;
import org.apache.druid.server.ResourceIdPopulatingQueryRunner;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
@ -74,8 +76,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -90,7 +90,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final String dataSource;
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
// Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions
private final VersionedIntervalTimeline<String, SinkHolder> upgradedSegmentsTimeline;
private final ObjectMapper objectMapper;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
@ -98,12 +99,10 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToBasePendingSegment
= new ConcurrentHashMap<>();
public SinkQuerySegmentWalker(
String dataSource,
VersionedIntervalTimeline<String, Sink> sinkTimeline,
VersionedIntervalTimeline<String, SinkHolder> upgradedSegmentsTimeline,
ObjectMapper objectMapper,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
@ -114,7 +113,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
this.upgradedSegmentsTimeline = upgradedSegmentsTimeline;
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
@ -133,7 +132,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
final Iterable<SegmentDescriptor> specs = FunctionalIterable
.create(intervals)
.transformCat(sinkTimeline::lookup)
.transformCat(upgradedSegmentsTimeline::lookup)
.transformCat(
holder -> FunctionalIterable
.create(holder.getObject())
@ -196,9 +195,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
try {
for (final SegmentDescriptor newDescriptor : specs) {
final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
for (final SegmentDescriptor descriptor : specs) {
final PartitionChunk<SinkHolder> chunk = upgradedSegmentsTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getPartitionNumber()
@ -212,7 +210,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
continue;
}
final Sink theSink = chunk.getObject();
final Sink theSink = chunk.getObject().sink;
final SegmentId sinkSegmentId = theSink.getSegment().getId();
segmentIdMap.put(descriptor, sinkSegmentId);
final List<SinkSegmentReference> sinkSegmentReferences =
@ -361,14 +359,41 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
}
}
public void registerUpgradedPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec upgradedPendingSegment
)
/**
* Must be called when a segment is announced by a task.
* Either the base segment upon allocation or any upgraded version due to a concurrent replace
*/
public void registerUpgradedPendingSegment(SegmentIdWithShardSpec id, Sink sink)
{
newIdToBasePendingSegment.put(
upgradedPendingSegment.asSegmentId().toDescriptor(),
basePendingSegment.asSegmentId().toDescriptor()
final SegmentDescriptor upgradedDescriptor = id.asSegmentId().toDescriptor();
upgradedSegmentsTimeline.add(
upgradedDescriptor.getInterval(),
upgradedDescriptor.getVersion(),
IntegerPartitionChunk.make(
null,
null,
upgradedDescriptor.getPartitionNumber(),
new SinkHolder(upgradedDescriptor, sink)
)
);
}
/**
* Must be called when dropping sink from the sinkTimeline
* It is the responsibility of the caller to unregister all associated ids including the base id
*/
public void unregisterUpgradedPendingSegment(SegmentIdWithShardSpec id, Sink sink)
{
final SegmentDescriptor upgradedDescriptor = id.asSegmentId().toDescriptor();
upgradedSegmentsTimeline.remove(
upgradedDescriptor.getInterval(),
upgradedDescriptor.getVersion(),
IntegerPartitionChunk.make(
null,
null,
upgradedDescriptor.getPartitionNumber(),
new SinkHolder(upgradedDescriptor, sink)
)
);
}
@ -378,11 +403,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
return dataSource;
}
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
{
return sinkTimeline;
}
public static String makeHydrantCacheIdentifier(final FireHydrant hydrant)
{
return makeHydrantCacheIdentifier(hydrant.getSegmentId(), hydrant.getCount());
@ -395,4 +415,46 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
private static class SinkHolder implements Overshadowable<SinkHolder>
{
private final Sink sink;
private final SegmentDescriptor segmentDescriptor;
private SinkHolder(SegmentDescriptor segmentDescriptor, Sink sink)
{
this.segmentDescriptor = segmentDescriptor;
this.sink = sink;
}
@Override
public int getStartRootPartitionId()
{
return segmentDescriptor.getPartitionNumber();
}
@Override
public int getEndRootPartitionId()
{
return segmentDescriptor.getPartitionNumber() + 1;
}
@Override
public String getVersion()
{
return segmentDescriptor.getVersion();
}
@Override
public short getMinorVersion()
{
return 0;
}
@Override
public short getAtomicUpdateGroupSize()
{
return 1;
}
}
}

View File

@ -82,7 +82,6 @@ import org.apache.druid.segment.realtime.sink.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -95,6 +94,7 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -150,7 +150,6 @@ public class StreamAppenderator implements Appenderator
private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SegmentIdWithShardSpec> idToPendingSegment = new ConcurrentHashMap<>();
private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
private final boolean useMaxMemoryEstimates;
@ -250,14 +249,6 @@ public class StreamAppenderator implements Appenderator
this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
if (sinkQuerySegmentWalker == null) {
this.sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
} else {
this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
}
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
@ -612,12 +603,35 @@ public class StreamAppenderator implements Appenderator
}
}
/**
* Mark a given version of a segment as abandoned and return its base segment if it can be dropped.
* Return null if there are other valid versions of the segment that are yet to be dropped.
*/
private SegmentIdWithShardSpec abandonUpgradedIdentifier(final SegmentIdWithShardSpec identifier)
{
final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
synchronized (abandonedSegments) {
abandonedSegments.add(identifier);
if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {
Set<SegmentIdWithShardSpec> relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier));
relevantSegments.removeAll(abandonedSegments);
// If there are unabandoned segments associated with the sink, return early
// This may be the case if segments have been upgraded as the result of a concurrent replace
if (!relevantSegments.isEmpty()) {
return null;
}
}
}
return baseIdentifier;
}
@Override
public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier)
{
final Sink sink = sinks.get(identifier);
final SegmentIdWithShardSpec baseIdentifier = abandonUpgradedIdentifier(identifier);
final Sink sink = baseIdentifier == null ? null : sinks.get(baseIdentifier);
if (sink != null) {
return abandonSegment(identifier, sink, true);
return abandonSegment(baseIdentifier, sink, true);
} else {
return Futures.immediateFuture(null);
}
@ -1107,13 +1121,17 @@ public class StreamAppenderator implements Appenderator
/**
* Unannounces the given base segment and all its upgraded versions.
*
* @param baseSegment base segment
* @param sink sink corresponding to the base segment
* @return the set of all segment ids associated with the base segment containing the upgraded ids and itself.
*/
private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink)
private Set<SegmentIdWithShardSpec> unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink)
{
synchronized (sink) {
final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment);
if (!baseSegmentToUpgradedSegments.containsKey(baseId)) {
return;
return Collections.emptySet();
}
final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId);
@ -1132,6 +1150,7 @@ public class StreamAppenderator implements Appenderator
unannounceSegment(newSegment);
upgradedSegmentToBaseSegment.remove(newId);
}
return upgradedVersionsOfSegment;
}
}
@ -1155,11 +1174,13 @@ public class StreamAppenderator implements Appenderator
return;
}
final Sink sink = sinks.get(basePendingSegment);
// Update query mapping with SinkQuerySegmentWalker
((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment);
((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(upgradedPendingSegment, sink);
// Announce segments
final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
final DataSegment baseSegment = sink.getSegment();
final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment);
segmentAnnouncer.announceSegment(newSegment);
@ -1434,11 +1455,7 @@ public class StreamAppenderator implements Appenderator
baseSegmentToUpgradedSegments.put(identifier, new HashSet<>());
baseSegmentToUpgradedSegments.get(identifier).add(identifier);
sinkTimeline.add(
sink.getInterval(),
sink.getVersion(),
identifier.getShardSpec().createChunk(sink)
);
((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(identifier, sink);
}
private ListenableFuture<?> abandonSegment(
@ -1447,19 +1464,6 @@ public class StreamAppenderator implements Appenderator
final boolean removeOnDiskData
)
{
abandonedSegments.add(identifier);
final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
synchronized (sink) {
if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {
Set<SegmentIdWithShardSpec> relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier));
relevantSegments.removeAll(abandonedSegments);
// If there are unabandoned segments associated with the sink, return early
// This may be the case if segments have been upgraded as the result of a concurrent replace
if (!relevantSegments.isEmpty()) {
return Futures.immediateFuture(null);
}
}
}
// Ensure no future writes will be made to this sink.
if (sink.finishWriting()) {
// Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement,
@ -1477,7 +1481,7 @@ public class StreamAppenderator implements Appenderator
}
// Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(baseIdentifier);
droppingSinks.add(identifier);
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(
@ -1488,8 +1492,8 @@ public class StreamAppenderator implements Appenderator
@Override
public Void apply(@Nullable Object input)
{
if (!sinks.remove(baseIdentifier, sink)) {
log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier);
if (!sinks.remove(identifier, sink)) {
log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier);
return null;
}
@ -1497,17 +1501,17 @@ public class StreamAppenderator implements Appenderator
if (removeOnDiskData) {
// Remove this segment from the committed list. This must be done from the persist thread.
log.debug("Removing commit metadata for segment[%s].", baseIdentifier);
log.debug("Removing commit metadata for segment[%s].", identifier);
try {
commitLock.lock();
final Committed oldCommit = readCommit();
if (oldCommit != null) {
writeCommit(oldCommit.without(baseIdentifier.toString()));
writeCommit(oldCommit.without(identifier.toString()));
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource())
.addData("identifier", baseIdentifier.toString())
.addData("identifier", identifier.toString())
.emit();
throw new RuntimeException(e);
}
@ -1516,15 +1520,14 @@ public class StreamAppenderator implements Appenderator
}
}
unannounceAllVersionsOfSegment(sink.getSegment(), sink);
final Set<SegmentIdWithShardSpec> allVersionIds = unannounceAllVersionsOfSegment(sink.getSegment(), sink);
Runnable removeRunnable = () -> {
droppingSinks.remove(baseIdentifier);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
baseIdentifier.getShardSpec().createChunk(sink)
);
droppingSinks.remove(identifier);
for (SegmentIdWithShardSpec id : allVersionIds) {
// Update query mapping with SinkQuerySegmentWalker
((SinkQuerySegmentWalker) texasRanger).unregisterUpgradedPendingSegment(id, sink);
}
for (FireHydrant hydrant : sink) {
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
@ -1533,7 +1536,7 @@ public class StreamAppenderator implements Appenderator
}
if (removeOnDiskData) {
removeDirectory(computePersistDir(baseIdentifier));
removeDirectory(computePersistDir(identifier));
}
log.info("Dropped segment[%s].", identifier);

View File

@ -65,7 +65,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.sink.Sink;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -353,12 +352,9 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
{
this.taskAppenderatorMap = new HashMap<>();
VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
this.walker = new SinkQuerySegmentWalker(
dataSource,
sinkTimeline,
new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER),
objectMapper,
serviceEmitter,
queryRunnerFactoryConglomerateProvider.get(),

View File

@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Order;
import org.apache.druid.query.QueryPlus;
@ -1132,6 +1133,730 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
}
}
@Test
public void testQueryBySegments_withSegmentVersionUpgrades() throws Exception
{
try (
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.build()) {
final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
// Segment0 for interval upgraded after appends
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2000/2001", "B", 1),
si("2000/2001", "B", 1).asSegmentId().toString(),
IDENTIFIERS.get(0).asSegmentId().toString(),
IDENTIFIERS.get(0).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
// Segment1 allocated for version B
appenderator.add(si("2000/2001", "B", 2), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
// Concurrent replace registers a segment version upgrade for the second interval
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2001/2002", "B", 1),
si("2001/2002", "B", 1).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
// Another Concurrent replace registers upgrade with version C for the second interval
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2001/2002", "C", 7),
si("2001/2002", "C", 7).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
// Query1: segment #2
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
IDENTIFIERS.get(2).getInterval(),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final TimeseriesQuery query1_B = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
IDENTIFIERS.get(2).getInterval(),
"B",
1
)
)
)
)
.build();
final TimeseriesQuery query1_C = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
IDENTIFIERS.get(2).getInterval(),
"C",
7
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results1
);
final List<Result<TimeseriesResultValue>> results1_B =
QueryPlus.wrap(query1_B).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1_B",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results1_B
);
final List<Result<TimeseriesResultValue>> results1_C =
QueryPlus.wrap(query1_C).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1_C",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results1_C
);
// Query2: segment #2, partial
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final TimeseriesQuery query2_B = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"B",
1
)
)
)
)
.build();
final TimeseriesQuery query2_C = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"C",
7
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results2
);
final List<Result<TimeseriesResultValue>> results2_B =
QueryPlus.wrap(query2_B).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2_B",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results2_B
);
final List<Result<TimeseriesResultValue>> results2_C =
QueryPlus.wrap(query2_C).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2_C",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results2_C
);
// Query3: segment #2, two disjoint intervals
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final TimeseriesQuery query3_B = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"B",
1
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
"B",
1
)
)
)
)
.build();
final TimeseriesQuery query3_C = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"C",
7
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
"C",
7
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query3",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results3
);
final List<Result<TimeseriesResultValue>> results3_B =
QueryPlus.wrap(query3_B).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query3_B",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results3_B
);
final List<Result<TimeseriesResultValue>> results3_C =
QueryPlus.wrap(query3_C).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query3_C",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results3_C
);
final ScanQuery query4 = Druids.newScanQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.order(Order.ASCENDING)
.batchSize(10)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final ScanQuery query4_B = Druids.newScanQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"B",
1
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
"B",
1
)
)
)
)
.order(Order.ASCENDING)
.batchSize(10)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final ScanQuery query4_C = Druids.newScanQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
"C",
7
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
"C",
7
)
)
)
)
.order(Order.ASCENDING)
.batchSize(10)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final List<ScanResultValue> results4 =
QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
((List<Object>) ((List<Object>) results4.get(0).getEvents()).get(0)).toArray()
);
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
);
final List<ScanResultValue> results4_B =
QueryPlus.wrap(query4_B).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(2, results4_B.size()); // 2 segments, 1 row per segment
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_B.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
((List<Object>) ((List<Object>) results4_B.get(0).getEvents()).get(0)).toArray()
);
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_B.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4_B.get(1).getEvents()).get(0)).toArray()
);
final List<ScanResultValue> results4_C =
QueryPlus.wrap(query4_C).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(2, results4_C.size()); // 2 segments, 1 row per segment
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_C.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
((List<Object>) ((List<Object>) results4_C.get(0).getEvents()).get(0)).toArray()
);
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4_C.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4_C.get(1).getEvents()).get(0)).toArray()
);
}
}
@Test
public void testQueryByIntervals_withSegmentVersionUpgrades() throws Exception
{
try (
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.build()) {
final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
// Segment0 for interval upgraded after appends
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2000/2001", "B", 1),
si("2000/2001", "B", 1).asSegmentId().toString(),
IDENTIFIERS.get(0).asSegmentId().toString(),
IDENTIFIERS.get(0).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
// Segment1 allocated for version B
appenderator.add(si("2000/2001", "B", 2), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
// Concurrent replace registers a segment version upgrade for the second interval
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2001/2002", "B", 1),
si("2001/2002", "B", 1).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
// Another Concurrent replace registers upgrade with version C for the second interval
appenderator.registerUpgradedPendingSegment(
new PendingSegmentRecord(
si("2001/2002", "C", 7),
si("2001/2002", "C", 7).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString(),
StreamAppenderatorTester.DATASOURCE
)
);
// Query1: 2000/2001
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
)
),
results1
);
// Query2: 2000/2002
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2002")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results2
);
// Query3: 2000/2001T01
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001T01")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results3
);
// Query4: 2000/2001T01, 2001T03/2001T04
final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
ImmutableList.of(
Intervals.of("2000/2001T01"),
Intervals.of("2001T03/2001T04")
)
)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results4 =
QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results4
);
// Drop segment
appenderator.drop(IDENTIFIERS.get(0)).get();
// Drop its upgraded version (Drop happens for each version on handoff)
appenderator.drop(si("2000/2001", "B", 1)).get();
final List<Result<TimeseriesResultValue>> resultsAfterDrop1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L))
)
),
resultsAfterDrop1
);
final List<Result<TimeseriesResultValue>> resultsAfterDrop2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
resultsAfterDrop2
);
final List<Result<TimeseriesResultValue>> resultsAfterDrop3 =
QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
resultsAfterDrop3
);
final List<Result<TimeseriesResultValue>> resultsAfterDrop4 =
QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
resultsAfterDrop4
);
}
}
@Test
public void testQueryByIntervals() throws Exception
{