diff --git a/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java b/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java index fb9e577b83c..4d85a1d040a 100644 --- a/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java +++ b/core/src/main/java/org/apache/druid/timeline/TimelineLookup.java @@ -19,7 +19,7 @@ package org.apache.druid.timeline; -import org.apache.druid.timeline.partition.PartitionHolder; +import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -51,5 +51,9 @@ public interface TimelineLookup> lookupWithIncompletePartitions(Interval interval); - @Nullable PartitionHolder findEntry(Interval interval, VersionType version); + /** + * Finds the {@link PartitionChunk} for the given time interval, version and chunk number. + */ + @Nullable + PartitionChunk findChunk(Interval interval, VersionType version, int partitionNum); } diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 376a97ffee7..3b8d4129d77 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -20,7 +20,6 @@ package org.apache.druid.timeline; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterators; @@ -117,10 +116,14 @@ public class VersionedIntervalTimeline segment.getShardSpec().createChunk(segment)), - DataSegment::getInterval, - DataSegment::getVersion - ); + Iterators.transform( + segments, + segment -> new PartitionChunkEntry<>( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segment) + ) + )); } public Map> getAllTimelineEntries() @@ -183,13 +186,11 @@ public class VersionedIntervalTimeline object) { - addAll(Iterators.singletonIterator(object), o -> interval, o -> version); + addAll(Iterators.singletonIterator(new PartitionChunkEntry<>(interval, version, object))); } - private void addAll( - final Iterator> objects, - final Function intervalFunction, - final Function versionFunction + public void addAll( + final Iterator> objects ) { lock.writeLock().lock(); @@ -198,9 +199,10 @@ public class VersionedIntervalTimeline allEntries = new IdentityHashMap<>(); while (objects.hasNext()) { - PartitionChunk object = objects.next(); - Interval interval = intervalFunction.apply(object.getObject()); - VersionType version = versionFunction.apply(object.getObject()); + PartitionChunkEntry chunkEntry = objects.next(); + PartitionChunk object = chunkEntry.getChunk(); + Interval interval = chunkEntry.getInterval(); + VersionType version = chunkEntry.getVersion(); Map exists = allTimelineEntries.get(interval); TimelineEntry entry; @@ -284,7 +286,7 @@ public class VersionedIntervalTimeline findEntry(Interval interval, VersionType version) + public PartitionChunk findChunk(Interval interval, VersionType version, int partitionNum) { lock.readLock().lock(); try { @@ -292,7 +294,7 @@ public class VersionedIntervalTimeline + { + private final Interval interval; + private final VersionType version; + private final PartitionChunk chunk; + + public PartitionChunkEntry( + Interval interval, + VersionType version, + PartitionChunk chunk + ) + { + this.interval = interval; + this.version = version; + this.chunk = chunk; + } + + public Interval getInterval() + { + return interval; + } + + public VersionType getVersion() + { + return version; + } + + public PartitionChunk getChunk() + { + return chunk; + } + } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java deleted file mode 100644 index 9a3bf115cbb..00000000000 --- a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.timeline.partition; - -import org.apache.druid.timeline.Overshadowable; - -/** - */ -public class ImmutablePartitionHolder> extends PartitionHolder -{ - protected ImmutablePartitionHolder(OvershadowableManager overshadowableManager) - { - super(overshadowableManager); - } - - @Override - public PartitionChunk remove(PartitionChunk tPartitionChunk) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean add(PartitionChunk tPartitionChunk) - { - throw new UnsupportedOperationException(); - } -} diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java index 289ac934825..df133f02af6 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java @@ -65,11 +65,6 @@ public class PartitionHolder> implements Iterable asImmutable() - { - return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager)); - } - public boolean add(PartitionChunk chunk) { return overshadowableManager.addChunk(chunk); diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleElementPartitionChunk.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleElementPartitionChunk.java index 2567fe61755..90f1023bfb3 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleElementPartitionChunk.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleElementPartitionChunk.java @@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition; /** */ +@Deprecated public class SingleElementPartitionChunk implements PartitionChunk { private final T element; diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java index 0bb2354177b..a41eda2fc36 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.OvershadowableInteger; -import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.DateTime; import org.joda.time.Days; import org.joda.time.Hours; @@ -221,36 +220,64 @@ public class VersionedIntervalTimelineSpecificDataTest extends VersionedInterval } @Test - public void testFindEntry() + public void testFindChunk() { - Assert.assertEquals( - new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), - timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1") + assertSingleElementChunks( + makeSingle("1", 1), + timeline.findChunk(Intervals.of("2011-10-01/2011-10-02"), "1", 0) ); - Assert.assertEquals( - new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), - timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1") + assertSingleElementChunks( + makeSingle("1", 1), + timeline.findChunk(Intervals.of("2011-10-01/2011-10-01T10"), "1", 0) ); - Assert.assertEquals( - new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), - timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1") + assertSingleElementChunks( + makeSingle("1", 1), + timeline.findChunk(Intervals.of("2011-10-01T02/2011-10-02"), "1", 0) ); + assertSingleElementChunks( + makeSingle("1", 1), + timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 0) + ); + + IntegerPartitionChunk expected = IntegerPartitionChunk.make( + 10, + null, + 1, + new OvershadowableInteger( + "3", + 1, + 21 + ) + ); + IntegerPartitionChunk actual = (IntegerPartitionChunk) timeline.findChunk( + Intervals.of("2011-10-02/2011-10-03"), + "3", + 1 + ); + Assert.assertEquals(expected, actual); + Assert.assertEquals(expected.getObject(), actual.getObject()); + Assert.assertEquals( - new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), - timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1") + null, + timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 1) ); Assert.assertEquals( null, - timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "2") + timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "2", 0) ); Assert.assertEquals( null, - timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-02T17"), "1") + timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0) + ); + + Assert.assertEquals( + null, + timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0) ); } diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java index 57abe7a8108..de3c5eeccaf 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java @@ -51,14 +51,14 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest } @Test - public void testFindEntryWithOverlap() + public void testFindChunkWithOverlap() { add("2011-01-01/2011-01-10", "1", 1); add("2011-01-02/2011-01-05", "2", 1); - Assert.assertEquals( - new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), - timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1") + assertSingleElementChunks( + makeSingle("1", 1), + timeline.findChunk(Intervals.of("2011-01-02T02/2011-01-04"), "1", 0) ); } diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java index 3d8c5a34925..53ab4db3cc5 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java @@ -98,6 +98,16 @@ public class VersionedIntervalTimelineTestBase Assert.assertEquals(expected, actualSet); } + static void assertSingleElementChunks( + PartitionChunk expected, + PartitionChunk actual + ) + { + SingleElementPartitionChunk expectedSingle = (SingleElementPartitionChunk) expected; + SingleElementPartitionChunk actualSingle = (SingleElementPartitionChunk) actual; + Assert.assertEquals(expectedSingle.getObject(), actualSingle.getObject()); + } + static VersionedIntervalTimeline makeStringIntegerTimeline() { return new VersionedIntervalTimeline<>(Ordering.natural()); diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index d1adc82cebb..369ba2aa20b 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.RangeSet; @@ -84,8 +85,8 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.VersionedIntervalTimeline.PartitionChunkEntry; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -98,6 +99,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.SortedMap; @@ -229,20 +231,7 @@ public class CachingClusteredClient implements QuerySegmentWalker return CachingClusteredClient.this.run( queryPlus, responseContext, - timeline -> { - final VersionedIntervalTimeline timeline2 = - new VersionedIntervalTimeline<>(Ordering.natural()); - for (SegmentDescriptor spec : specs) { - final PartitionHolder entry = timeline.findEntry(spec.getInterval(), spec.getVersion()); - if (entry != null) { - final PartitionChunk chunk = entry.getChunk(spec.getPartitionNumber()); - if (chunk != null) { - timeline2.add(spec.getInterval(), spec.getVersion(), chunk); - } - } - } - return timeline2; - }, + new TimelineConverter(specs), true ); } @@ -856,4 +845,49 @@ public class CachingClusteredClient implements QuerySegmentWalker return strategy.computeCacheKey(query); } } + + private static class TimelineConverter implements UnaryOperator> + { + private final Iterable specs; + + TimelineConverter(final Iterable specs) + { + this.specs = specs; + } + + @Override + public TimelineLookup apply(TimelineLookup timeline) + { + final VersionedIntervalTimeline timeline2 = + new VersionedIntervalTimeline<>(Ordering.natural()); + Iterator> unfilteredIterator = + Iterators.transform(specs.iterator(), spec -> toChunkEntry(timeline, spec)); + Iterator> iterator = Iterators.filter( + unfilteredIterator, + Objects::nonNull + ); + // VersionedIntervalTimeline#addAll implementation is much more efficient than calling VersionedIntervalTimeline#add + // in a loop when there are lot of segments to be added for same interval and version. + timeline2.addAll(iterator); + return timeline2; + } + + @Nullable + private PartitionChunkEntry toChunkEntry( + TimelineLookup timeline, + SegmentDescriptor spec + ) + { + PartitionChunk chunk = timeline.findChunk( + spec.getInterval(), + spec.getVersion(), + spec.getPartitionNumber() + ); + if (null == chunk) { + return null; + } + return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk); + + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 7704bb89a29..783a327c00b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -67,7 +67,6 @@ import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import java.io.Closeable; @@ -187,15 +186,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker Iterable> perSegmentRunners = Iterables.transform( specs, descriptor -> { - final PartitionHolder holder = sinkTimeline.findEntry( + final PartitionChunk chunk = sinkTimeline.findChunk( descriptor.getInterval(), - descriptor.getVersion() + descriptor.getVersion(), + descriptor.getPartitionNumber() ); - if (holder == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); if (chunk == null) { return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); } diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index c3636d3a978..93b16a317b2 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -40,7 +40,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CollectionUtils; @@ -234,12 +233,13 @@ public class SegmentManager final DataSourceState dataSourceState = v == null ? new DataSourceState() : v; final VersionedIntervalTimeline loadedIntervals = dataSourceState.getTimeline(); - final PartitionHolder entry = loadedIntervals.findEntry( + final PartitionChunk entry = loadedIntervals.findChunk( segment.getInterval(), - segment.getVersion() + segment.getVersion(), + segment.getShardSpec().getPartitionNum() ); - if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { + if (entry != null) { log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId()); resultSupplier.set(false); } else { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 2ceaf96753a..ffdb38d8051 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -68,7 +68,6 @@ import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import java.util.Collections; @@ -251,16 +250,12 @@ public class ServerManager implements QuerySegmentWalker Optional cacheKeyPrefix ) { - final PartitionHolder entry = timeline.findEntry( + final PartitionChunk chunk = timeline.findChunk( descriptor.getInterval(), - descriptor.getVersion() + descriptor.getVersion(), + descriptor.getPartitionNumber() ); - if (entry == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final PartitionChunk chunk = entry.getChunk(descriptor.getPartitionNumber()); if (chunk == null) { return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 7878e6afdc9..9558133f294 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -110,22 +110,20 @@ public class BrokerServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); + final int partition = segment.getShardSpec().getPartitionNum(); + final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D"); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline( + TimelineLookup timeline = brokerServerView.getTimeline( DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) ).get(); - List serverLookupRes = (List) timeline.lookup( - Intervals.of( - "2014-10-20T00:00:00Z/P1D" - ) - ); + List> serverLookupRes = timeline.lookup(intervals); Assert.assertEquals(1, serverLookupRes.size()); TimelineObjectHolder actualTimelineObjectHolder = serverLookupRes.get(0); - Assert.assertEquals(Intervals.of("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); + Assert.assertEquals(intervals, actualTimelineObjectHolder.getInterval()); Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); @@ -136,15 +134,16 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertFalse(selector.isEmpty()); Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(druidServer, selector.pick(null).getServer()); + Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition)); unannounceSegmentForServer(druidServer, segment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertEquals( 0, - ((List) timeline.lookup(Intervals.of("2014-10-20T00:00:00Z/P1D"))).size() + timeline.lookup(intervals).size() ); - Assert.assertNull(timeline.findEntry(Intervals.of("2014-10-20T00:00:00Z/P1D"), "v1")); + Assert.assertNull(timeline.findChunk(intervals, "v1", partition)); } @Test diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java new file mode 100644 index 00000000000..ab1d6cd7b80 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Ordering; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.client.cache.CachePopulator; +import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.QueryableDruidServer; +import org.apache.druid.client.selector.RandomServerSelectorStrategy; +import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.guice.http.DruidHttpClientConfig; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.TestSequence; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.coordination.ServerManagerTest; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ForkJoinPool; + +import static org.mockito.ArgumentMatchers.any; + +/** + * Performance tests for {@link CachingClusteredClient}, that do not require a real cluster, can be added here. + * There is one test for a scenario where a single interval has large number of segments. + */ +public class CachingClusteredClientPerfTest +{ + + @Test(timeout = 10_000) + public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() + { + final int segmentCount = 30_000; + final Interval interval = Intervals.of("2021-02-13/2021-02-14"); + final List segmentDescriptors = new ArrayList<>(segmentCount); + final List dataSegments = new ArrayList<>(segmentCount); + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + final DruidServer server = new DruidServer( + "server", + "localhost:9000", + null, + Long.MAX_VALUE, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + DruidServer.DEFAULT_PRIORITY + ); + + for (int ii = 0; ii < segmentCount; ii++) { + segmentDescriptors.add(new SegmentDescriptor(interval, "1", ii)); + DataSegment segment = makeDataSegment("test", interval, "1", ii); + dataSegments.add(segment); + } + timeline.addAll( + Iterators.transform(dataSegments.iterator(), segment -> { + ServerSelector ss = new ServerSelector( + segment, + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + ss.addServerAndUpdateSegment(new QueryableDruidServer( + server, + new MockQueryRunner() + ), segment); + return new VersionedIntervalTimeline.PartitionChunkEntry<>( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(ss) + ); + }) + ); + + TimelineServerView serverView = Mockito.mock(TimelineServerView.class); + QueryScheduler queryScheduler = Mockito.mock(QueryScheduler.class); + // mock scheduler to return same sequence as argument + Mockito.when(queryScheduler.run(any(), any())).thenAnswer(i -> i.getArgument(1)); + Mockito.when(queryScheduler.prioritizeAndLaneQuery(any(), any())) + .thenAnswer(i -> ((QueryPlus) i.getArgument(0)).getQuery()); + + Mockito.doReturn(Optional.of(timeline)).when(serverView).getTimeline(any()); + Mockito.doReturn(new MockQueryRunner()).when(serverView).getQueryRunner(any()); + CachingClusteredClient cachingClusteredClient = new CachingClusteredClient( + new MockQueryToolChestWareHouse(), + serverView, + MapCache.create(1024), + TestHelper.makeJsonMapper(), + Mockito.mock(CachePopulator.class), + new CacheConfig(), + Mockito.mock(DruidHttpClientConfig.class), + Mockito.mock(DruidProcessingConfig.class), + ForkJoinPool.commonPool(), + queryScheduler, + NoopJoinableFactory.INSTANCE + ); + + Query fakeQuery = makeFakeQuery(interval); + QueryRunner queryRunner = cachingClusteredClient.getQueryRunnerForSegments( + fakeQuery, + segmentDescriptors + ); + Sequence sequence = queryRunner.run(QueryPlus.wrap(fakeQuery)); + Assert.assertEquals(segmentDescriptors, sequence.toList()); + } + + private Query makeFakeQuery(Interval interval) + { + return new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Collections.singletonList(interval)), + false, + ImmutableMap.of(BaseQuery.QUERY_ID, "testQuery") + ); + } + + private DataSegment makeDataSegment(String dataSource, Interval interval, String version, int partition) + { + return DataSegment.builder() + .dataSource(dataSource) + .interval(interval) + .version(version) + .shardSpec(new LinearShardSpec(partition)) + .size(1) + .build(); + } + + private static class MockQueryToolChestWareHouse implements QueryToolChestWarehouse + { + + @Override + public > QueryToolChest getToolChest(QueryType query) + { + return new ServerManagerTest.NoopQueryToolChest<>(); + } + } + + private static class MockQueryRunner implements QueryRunner + { + + @Override + public Sequence run( + QueryPlus queryPlus, + ResponseContext responseContext + ) + { + TestQuery query = (TestQuery) queryPlus.getQuery(); + return TestSequence.create(((MultipleSpecificSegmentSpec) query.getSpec()).getDescriptors()); + } + } + + private static class TestQuery extends BaseQuery + { + private QuerySegmentSpec spec; + + public TestQuery( + DataSource dataSource, + QuerySegmentSpec querySegmentSpec, + boolean descending, + Map context + ) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return "string"; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return this; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + this.spec = spec; + return this; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return this; + } + + public QuerySegmentSpec getSpec() + { + return spec; + } + } + +} diff --git a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java index f6e26989ead..961303ca57b 100644 --- a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java @@ -101,18 +101,20 @@ public class CoordinatorServerViewTest extends CuratorTestBase setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); + final int partition = segment.getShardSpec().getPartitionNum(); + final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D"); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view")); List serverLookupRes = (List) timeline.lookup( - Intervals.of("2014-10-20T00:00:00Z/P1D") + intervals ); Assert.assertEquals(1, serverLookupRes.size()); TimelineObjectHolder actualTimelineObjectHolder = serverLookupRes.get(0); - Assert.assertEquals(Intervals.of("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval()); + Assert.assertEquals(intervals, actualTimelineObjectHolder.getInterval()); Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject(); @@ -125,6 +127,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()) ); + Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition)); unannounceSegmentForServer(druidServer, segment); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -133,7 +136,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase 0, ((List) timeline.lookup(Intervals.of("2014-10-20T00:00:00Z/P1D"))).size() ); - Assert.assertNull(timeline.findEntry(Intervals.of("2014-10-20T00:00:00Z/P1D"), "v1")); + Assert.assertNull(timeline.findChunk(intervals, "v1", partition)); } @Test diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 3ba135f9b6c..324eb14dc35 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -53,7 +53,6 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -261,11 +260,12 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker final List retVal = new ArrayList<>(); for (SegmentDescriptor spec : specs) { - final PartitionHolder entry = timeline.findEntry( + final PartitionChunk entry = timeline.findChunk( spec.getInterval(), - spec.getVersion() + spec.getVersion(), + spec.getPartitionNumber() ); - retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval())); + retVal.add(new WindowedSegment(entry.getObject(), spec.getInterval())); } return retVal;