Avoid expensive findEntry call in segment metadata query (#10892)

* Avoid expensive findEntry call in segment metadata query

* other places

* Remove findEntry

* Fix add cost

* Refactor a bit

* Add performance test

* Add comment

* Review comments

* intellij
This commit is contained in:
Abhishek Agarwal 2021-03-09 11:38:33 +05:30 committed by GitHub
parent ae620921df
commit 489f5b1a03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 449 additions and 135 deletions

View File

@ -19,7 +19,7 @@
package org.apache.druid.timeline; package org.apache.druid.timeline;
import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -51,5 +51,9 @@ public interface TimelineLookup<VersionType, ObjectType extends Overshadowable<O
*/ */
List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval); List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);
@Nullable PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version); /**
* Finds the {@link PartitionChunk} for the given time interval, version and chunk number.
*/
@Nullable
PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum);
} }

View File

@ -20,7 +20,6 @@
package org.apache.druid.timeline; package org.apache.druid.timeline;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable; import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -117,10 +116,14 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
) )
{ {
timeline.addAll( timeline.addAll(
Iterators.transform(segments, segment -> segment.getShardSpec().createChunk(segment)), Iterators.transform(
DataSegment::getInterval, segments,
DataSegment::getVersion segment -> new PartitionChunkEntry<>(
); segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
));
} }
public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries() public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries()
@ -183,13 +186,11 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object) public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{ {
addAll(Iterators.singletonIterator(object), o -> interval, o -> version); addAll(Iterators.singletonIterator(new PartitionChunkEntry<>(interval, version, object)));
} }
private void addAll( public void addAll(
final Iterator<PartitionChunk<ObjectType>> objects, final Iterator<PartitionChunkEntry<VersionType, ObjectType>> objects
final Function<ObjectType, Interval> intervalFunction,
final Function<ObjectType, VersionType> versionFunction
) )
{ {
lock.writeLock().lock(); lock.writeLock().lock();
@ -198,9 +199,10 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
final IdentityHashMap<TimelineEntry, Interval> allEntries = new IdentityHashMap<>(); final IdentityHashMap<TimelineEntry, Interval> allEntries = new IdentityHashMap<>();
while (objects.hasNext()) { while (objects.hasNext()) {
PartitionChunk<ObjectType> object = objects.next(); PartitionChunkEntry<VersionType, ObjectType> chunkEntry = objects.next();
Interval interval = intervalFunction.apply(object.getObject()); PartitionChunk<ObjectType> object = chunkEntry.getChunk();
VersionType version = versionFunction.apply(object.getObject()); Interval interval = chunkEntry.getInterval();
VersionType version = chunkEntry.getVersion();
Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval); Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval);
TimelineEntry entry; TimelineEntry entry;
@ -284,7 +286,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
@Override @Override
@Nullable @Nullable
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version) public PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum)
{ {
lock.readLock().lock(); lock.readLock().lock();
try { try {
@ -292,7 +294,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) { if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version); TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) { if (foundEntry != null) {
return foundEntry.getPartitionHolder().asImmutable(); return foundEntry.getPartitionHolder().getChunk(partitionNum);
} }
} }
} }
@ -849,4 +851,41 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
return Objects.hash(trueInterval, version, partitionHolder); return Objects.hash(trueInterval, version, partitionHolder);
} }
} }
/**
* Stores a {@link PartitionChunk} for a given interval and version. The
* interval corresponds to the {@link LogicalSegment#getInterval()}
*/
public static class PartitionChunkEntry<VersionType, ObjectType>
{
private final Interval interval;
private final VersionType version;
private final PartitionChunk<ObjectType> chunk;
public PartitionChunkEntry(
Interval interval,
VersionType version,
PartitionChunk<ObjectType> chunk
)
{
this.interval = interval;
this.version = version;
this.chunk = chunk;
}
public Interval getInterval()
{
return interval;
}
public VersionType getVersion()
{
return version;
}
public PartitionChunk<ObjectType> getChunk()
{
return chunk;
}
}
} }

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import org.apache.druid.timeline.Overshadowable;
/**
*/
public class ImmutablePartitionHolder<T extends Overshadowable<T>> extends PartitionHolder<T>
{
protected ImmutablePartitionHolder(OvershadowableManager<T> overshadowableManager)
{
super(overshadowableManager);
}
@Override
public PartitionChunk<T> remove(PartitionChunk<T> tPartitionChunk)
{
throw new UnsupportedOperationException();
}
@Override
public boolean add(PartitionChunk<T> tPartitionChunk)
{
throw new UnsupportedOperationException();
}
}

View File

@ -65,11 +65,6 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
this.overshadowableManager = overshadowableManager; this.overshadowableManager = overshadowableManager;
} }
public ImmutablePartitionHolder<T> asImmutable()
{
return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager));
}
public boolean add(PartitionChunk<T> chunk) public boolean add(PartitionChunk<T> chunk)
{ {
return overshadowableManager.addChunk(chunk); return overshadowableManager.addChunk(chunk);

View File

@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition;
/** /**
*/ */
@Deprecated
public class SingleElementPartitionChunk<T> implements PartitionChunk<T> public class SingleElementPartitionChunk<T> implements PartitionChunk<T>
{ {
private final T element; private final T element;

View File

@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger; import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Days; import org.joda.time.Days;
import org.joda.time.Hours; import org.joda.time.Hours;
@ -221,36 +220,64 @@ public class VersionedIntervalTimelineSpecificDataTest extends VersionedInterval
} }
@Test @Test
public void testFindEntry() public void testFindChunk()
{ {
Assert.assertEquals( assertSingleElementChunks(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), makeSingle("1", 1),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1") timeline.findChunk(Intervals.of("2011-10-01/2011-10-02"), "1", 0)
); );
Assert.assertEquals( assertSingleElementChunks(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), makeSingle("1", 1),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1") timeline.findChunk(Intervals.of("2011-10-01/2011-10-01T10"), "1", 0)
); );
Assert.assertEquals( assertSingleElementChunks(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), makeSingle("1", 1),
timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1") timeline.findChunk(Intervals.of("2011-10-01T02/2011-10-02"), "1", 0)
); );
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 0)
);
IntegerPartitionChunk<OvershadowableInteger> expected = IntegerPartitionChunk.make(
10,
null,
1,
new OvershadowableInteger(
"3",
1,
21
)
);
IntegerPartitionChunk<OvershadowableInteger> actual = (IntegerPartitionChunk<OvershadowableInteger>) timeline.findChunk(
Intervals.of("2011-10-02/2011-10-03"),
"3",
1
);
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getObject(), actual.getObject());
Assert.assertEquals( Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1") timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 1)
); );
Assert.assertEquals( Assert.assertEquals(
null, null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "2") timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "2", 0)
); );
Assert.assertEquals( Assert.assertEquals(
null, null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-02T17"), "1") timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
);
Assert.assertEquals(
null,
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
); );
} }

View File

@ -51,14 +51,14 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest
} }
@Test @Test
public void testFindEntryWithOverlap() public void testFindChunkWithOverlap()
{ {
add("2011-01-01/2011-01-10", "1", 1); add("2011-01-01/2011-01-10", "1", 1);
add("2011-01-02/2011-01-05", "2", 1); add("2011-01-02/2011-01-05", "2", 1);
Assert.assertEquals( assertSingleElementChunks(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), makeSingle("1", 1),
timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1") timeline.findChunk(Intervals.of("2011-01-02T02/2011-01-04"), "1", 0)
); );
} }

View File

@ -98,6 +98,16 @@ public class VersionedIntervalTimelineTestBase
Assert.assertEquals(expected, actualSet); Assert.assertEquals(expected, actualSet);
} }
static void assertSingleElementChunks(
PartitionChunk<OvershadowableInteger> expected,
PartitionChunk<OvershadowableInteger> actual
)
{
SingleElementPartitionChunk<OvershadowableInteger> expectedSingle = (SingleElementPartitionChunk<OvershadowableInteger>) expected;
SingleElementPartitionChunk<OvershadowableInteger> actualSingle = (SingleElementPartitionChunk<OvershadowableInteger>) actual;
Assert.assertEquals(expectedSingle.getObject(), actualSingle.getObject());
}
static VersionedIntervalTimeline<String, OvershadowableInteger> makeStringIntegerTimeline() static VersionedIntervalTimeline<String, OvershadowableInteger> makeStringIntegerTimeline()
{ {
return new VersionedIntervalTimeline<>(Ordering.natural()); return new VersionedIntervalTimeline<>(Ordering.natural());

View File

@ -26,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.RangeSet; import com.google.common.collect.RangeSet;
@ -84,8 +85,8 @@ import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.VersionedIntervalTimeline.PartitionChunkEntry;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -98,6 +99,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
@ -229,20 +231,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
return CachingClusteredClient.this.run( return CachingClusteredClient.this.run(
queryPlus, queryPlus,
responseContext, responseContext,
timeline -> { new TimelineConverter(specs),
final VersionedIntervalTimeline<String, ServerSelector> timeline2 =
new VersionedIntervalTimeline<>(Ordering.natural());
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ServerSelector> entry = timeline.findEntry(spec.getInterval(), spec.getVersion());
if (entry != null) {
final PartitionChunk<ServerSelector> chunk = entry.getChunk(spec.getPartitionNumber());
if (chunk != null) {
timeline2.add(spec.getInterval(), spec.getVersion(), chunk);
}
}
}
return timeline2;
},
true true
); );
} }
@ -856,4 +845,49 @@ public class CachingClusteredClient implements QuerySegmentWalker
return strategy.computeCacheKey(query); return strategy.computeCacheKey(query);
} }
} }
private static class TimelineConverter implements UnaryOperator<TimelineLookup<String, ServerSelector>>
{
private final Iterable<SegmentDescriptor> specs;
TimelineConverter(final Iterable<SegmentDescriptor> specs)
{
this.specs = specs;
}
@Override
public TimelineLookup<String, ServerSelector> apply(TimelineLookup<String, ServerSelector> timeline)
{
final VersionedIntervalTimeline<String, ServerSelector> timeline2 =
new VersionedIntervalTimeline<>(Ordering.natural());
Iterator<PartitionChunkEntry<String, ServerSelector>> unfilteredIterator =
Iterators.transform(specs.iterator(), spec -> toChunkEntry(timeline, spec));
Iterator<PartitionChunkEntry<String, ServerSelector>> iterator = Iterators.filter(
unfilteredIterator,
Objects::nonNull
);
// VersionedIntervalTimeline#addAll implementation is much more efficient than calling VersionedIntervalTimeline#add
// in a loop when there are lot of segments to be added for same interval and version.
timeline2.addAll(iterator);
return timeline2;
}
@Nullable
private PartitionChunkEntry<String, ServerSelector> toChunkEntry(
TimelineLookup<String, ServerSelector> timeline,
SegmentDescriptor spec
)
{
PartitionChunk<ServerSelector> chunk = timeline.findChunk(
spec.getInterval(),
spec.getVersion(),
spec.getPartitionNumber()
);
if (null == chunk) {
return null;
}
return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk);
}
}
} }

View File

@ -67,7 +67,6 @@ import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable; import java.io.Closeable;
@ -187,15 +186,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform( Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs, specs,
descriptor -> { descriptor -> {
final PartitionHolder<Sink> holder = sinkTimeline.findEntry( final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(), descriptor.getInterval(),
descriptor.getVersion() descriptor.getVersion(),
descriptor.getPartitionNumber()
); );
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) { if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
} }

View File

@ -40,7 +40,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CollectionUtils; import org.apache.druid.utils.CollectionUtils;
@ -234,12 +233,13 @@ public class SegmentManager
final DataSourceState dataSourceState = v == null ? new DataSourceState() : v; final DataSourceState dataSourceState = v == null ? new DataSourceState() : v;
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals =
dataSourceState.getTimeline(); dataSourceState.getTimeline();
final PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry( final PartitionChunk<ReferenceCountingSegment> entry = loadedIntervals.findChunk(
segment.getInterval(), segment.getInterval(),
segment.getVersion() segment.getVersion(),
segment.getShardSpec().getPartitionNum()
); );
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { if (entry != null) {
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId()); log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false); resultSupplier.set(false);
} else { } else {

View File

@ -68,7 +68,6 @@ import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Collections; import java.util.Collections;
@ -251,16 +250,12 @@ public class ServerManager implements QuerySegmentWalker
Optional<byte[]> cacheKeyPrefix Optional<byte[]> cacheKeyPrefix
) )
{ {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry( final PartitionChunk<ReferenceCountingSegment> chunk = timeline.findChunk(
descriptor.getInterval(), descriptor.getInterval(),
descriptor.getVersion() descriptor.getVersion(),
descriptor.getPartitionNumber()
); );
if (entry == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(descriptor.getPartitionNumber());
if (chunk == null) { if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
} }

View File

@ -110,22 +110,20 @@ public class BrokerServerViewTest extends CuratorTestBase
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
final int partition = segment.getShardSpec().getPartitionNum();
final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D");
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline( TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
).get(); ).get();
List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup( List<TimelineObjectHolder<String, ServerSelector>> serverLookupRes = timeline.lookup(intervals);
Intervals.of(
"2014-10-20T00:00:00Z/P1D"
)
);
Assert.assertEquals(1, serverLookupRes.size()); Assert.assertEquals(1, serverLookupRes.size());
TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = serverLookupRes.get(0); TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = serverLookupRes.get(0);
Assert.assertEquals(Intervals.of("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); Assert.assertEquals(intervals, actualTimelineObjectHolder.getInterval());
Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion());
PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject(); PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject();
@ -136,15 +134,16 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertFalse(selector.isEmpty()); Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(segment, selector.getSegment());
Assert.assertEquals(druidServer, selector.pick(null).getServer()); Assert.assertEquals(druidServer, selector.pick(null).getServer());
Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition));
unannounceSegmentForServer(druidServer, segment, zkPathsConfig); unannounceSegmentForServer(druidServer, segment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Assert.assertEquals( Assert.assertEquals(
0, 0,
((List<TimelineObjectHolder>) timeline.lookup(Intervals.of("2014-10-20T00:00:00Z/P1D"))).size() timeline.lookup(intervals).size()
); );
Assert.assertNull(timeline.findEntry(Intervals.of("2014-10-20T00:00:00Z/P1D"), "v1")); Assert.assertNull(timeline.findChunk(intervals, "v1", partition));
} }
@Test @Test

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.TestSequence;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerManagerTest;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import static org.mockito.ArgumentMatchers.any;
/**
* Performance tests for {@link CachingClusteredClient}, that do not require a real cluster, can be added here.
* There is one test for a scenario where a single interval has large number of segments.
*/
public class CachingClusteredClientPerfTest
{
@Test(timeout = 10_000)
public void testGetQueryRunnerForSegments_singleIntervalLargeSegments()
{
final int segmentCount = 30_000;
final Interval interval = Intervals.of("2021-02-13/2021-02-14");
final List<SegmentDescriptor> segmentDescriptors = new ArrayList<>(segmentCount);
final List<DataSegment> dataSegments = new ArrayList<>(segmentCount);
final VersionedIntervalTimeline<String, ServerSelector> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
final DruidServer server = new DruidServer(
"server",
"localhost:9000",
null,
Long.MAX_VALUE,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_PRIORITY
);
for (int ii = 0; ii < segmentCount; ii++) {
segmentDescriptors.add(new SegmentDescriptor(interval, "1", ii));
DataSegment segment = makeDataSegment("test", interval, "1", ii);
dataSegments.add(segment);
}
timeline.addAll(
Iterators.transform(dataSegments.iterator(), segment -> {
ServerSelector ss = new ServerSelector(
segment,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
ss.addServerAndUpdateSegment(new QueryableDruidServer(
server,
new MockQueryRunner()
), segment);
return new VersionedIntervalTimeline.PartitionChunkEntry<>(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(ss)
);
})
);
TimelineServerView serverView = Mockito.mock(TimelineServerView.class);
QueryScheduler queryScheduler = Mockito.mock(QueryScheduler.class);
// mock scheduler to return same sequence as argument
Mockito.when(queryScheduler.run(any(), any())).thenAnswer(i -> i.getArgument(1));
Mockito.when(queryScheduler.prioritizeAndLaneQuery(any(), any()))
.thenAnswer(i -> ((QueryPlus) i.getArgument(0)).getQuery());
Mockito.doReturn(Optional.of(timeline)).when(serverView).getTimeline(any());
Mockito.doReturn(new MockQueryRunner()).when(serverView).getQueryRunner(any());
CachingClusteredClient cachingClusteredClient = new CachingClusteredClient(
new MockQueryToolChestWareHouse(),
serverView,
MapCache.create(1024),
TestHelper.makeJsonMapper(),
Mockito.mock(CachePopulator.class),
new CacheConfig(),
Mockito.mock(DruidHttpClientConfig.class),
Mockito.mock(DruidProcessingConfig.class),
ForkJoinPool.commonPool(),
queryScheduler,
NoopJoinableFactory.INSTANCE
);
Query<SegmentDescriptor> fakeQuery = makeFakeQuery(interval);
QueryRunner<SegmentDescriptor> queryRunner = cachingClusteredClient.getQueryRunnerForSegments(
fakeQuery,
segmentDescriptors
);
Sequence<SegmentDescriptor> sequence = queryRunner.run(QueryPlus.wrap(fakeQuery));
Assert.assertEquals(segmentDescriptors, sequence.toList());
}
private Query<SegmentDescriptor> makeFakeQuery(Interval interval)
{
return new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Collections.singletonList(interval)),
false,
ImmutableMap.of(BaseQuery.QUERY_ID, "testQuery")
);
}
private DataSegment makeDataSegment(String dataSource, Interval interval, String version, int partition)
{
return DataSegment.builder()
.dataSource(dataSource)
.interval(interval)
.version(version)
.shardSpec(new LinearShardSpec(partition))
.size(1)
.build();
}
private static class MockQueryToolChestWareHouse implements QueryToolChestWarehouse
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(QueryType query)
{
return new ServerManagerTest.NoopQueryToolChest<>();
}
}
private static class MockQueryRunner implements QueryRunner<SegmentDescriptor>
{
@Override
public Sequence<SegmentDescriptor> run(
QueryPlus<SegmentDescriptor> queryPlus,
ResponseContext responseContext
)
{
TestQuery query = (TestQuery) queryPlus.getQuery();
return TestSequence.create(((MultipleSpecificSegmentSpec) query.getSpec()).getDescriptors());
}
}
private static class TestQuery extends BaseQuery<SegmentDescriptor>
{
private QuerySegmentSpec spec;
public TestQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
boolean descending,
Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, descending, context);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return "string";
}
@Override
public Query<SegmentDescriptor> withOverriddenContext(Map<String, Object> contextOverride)
{
return this;
}
@Override
public Query<SegmentDescriptor> withQuerySegmentSpec(QuerySegmentSpec spec)
{
this.spec = spec;
return this;
}
@Override
public Query<SegmentDescriptor> withDataSource(DataSource dataSource)
{
return this;
}
public QuerySegmentSpec getSpec()
{
return spec;
}
}
}

View File

@ -101,18 +101,20 @@ public class CoordinatorServerViewTest extends CuratorTestBase
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
final int partition = segment.getShardSpec().getPartitionNum();
final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D");
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view"));
List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup( List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup(
Intervals.of("2014-10-20T00:00:00Z/P1D") intervals
); );
Assert.assertEquals(1, serverLookupRes.size()); Assert.assertEquals(1, serverLookupRes.size());
TimelineObjectHolder<String, SegmentLoadInfo> actualTimelineObjectHolder = serverLookupRes.get(0); TimelineObjectHolder<String, SegmentLoadInfo> actualTimelineObjectHolder = serverLookupRes.get(0);
Assert.assertEquals(Intervals.of("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); Assert.assertEquals(intervals, actualTimelineObjectHolder.getInterval());
Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion());
PartitionHolder<SegmentLoadInfo> actualPartitionHolder = actualTimelineObjectHolder.getObject(); PartitionHolder<SegmentLoadInfo> actualPartitionHolder = actualTimelineObjectHolder.getObject();
@ -125,6 +127,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
druidServer.getMetadata(), druidServer.getMetadata(),
Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()) Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())
); );
Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition));
unannounceSegmentForServer(druidServer, segment); unannounceSegmentForServer(druidServer, segment);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@ -133,7 +136,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
0, 0,
((List<TimelineObjectHolder>) timeline.lookup(Intervals.of("2014-10-20T00:00:00Z/P1D"))).size() ((List<TimelineObjectHolder>) timeline.lookup(Intervals.of("2014-10-20T00:00:00Z/P1D"))).size()
); );
Assert.assertNull(timeline.findEntry(Intervals.of("2014-10-20T00:00:00Z/P1D"), "v1")); Assert.assertNull(timeline.findChunk(intervals, "v1", partition));
} }
@Test @Test

View File

@ -53,7 +53,6 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -261,11 +260,12 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
final List<WindowedSegment> retVal = new ArrayList<>(); final List<WindowedSegment> retVal = new ArrayList<>();
for (SegmentDescriptor spec : specs) { for (SegmentDescriptor spec : specs) {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry( final PartitionChunk<ReferenceCountingSegment> entry = timeline.findChunk(
spec.getInterval(), spec.getInterval(),
spec.getVersion() spec.getVersion(),
spec.getPartitionNumber()
); );
retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval())); retVal.add(new WindowedSegment(entry.getObject(), spec.getInterval()));
} }
return retVal; return retVal;