diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 439fa6c6d75..d1adc82cebb 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -588,7 +588,7 @@ public class CachingClusteredClient implements QuerySegmentWalker { final SortedMap> serverSegments = new TreeMap<>(); for (SegmentServerSelector segmentServer : segments) { - final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(); + final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(query); if (queryableDruidServer == null) { log.makeAlert( @@ -812,7 +812,8 @@ public class CachingClusteredClient implements QuerySegmentWalker Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; for (SegmentServerSelector p : segments) { - if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) { + QueryableDruidServer queryableServer = p.getServer().pick(query); + if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) { hasOnlyHistoricalSegments = false; break; } diff --git a/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java index 43991af6fbc..01c38d93b7b 100644 --- a/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java @@ -21,6 +21,7 @@ package org.apache.druid.client.selector; import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.query.Query; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -38,19 +39,21 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate { this.serverSelectorStrategy = serverSelectorStrategy; } - + @Nullable @Override - public QueryableDruidServer pick( + public QueryableDruidServer pick( + Query query, Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment ) { - return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null); + return Iterables.getOnlyElement(pick(query, prioritizedServers, segment, 1), null); } @Override - public List pick( + public List pick( + Query query, Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment, int numServersToPick @@ -58,7 +61,7 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate { List result = new ArrayList<>(numServersToPick); for (Set priorityServers : prioritizedServers.values()) { - result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size())); + result.addAll(serverSelectorStrategy.pick(query, priorityServers, segment, numServersToPick - result.size())); if (result.size() == numServersToPick) { break; } diff --git a/server/src/main/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategy.java index 542d608e41c..637db777b95 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategy.java @@ -24,6 +24,7 @@ import com.google.common.collect.Ordering; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -34,6 +35,7 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra private static final Comparator COMPARATOR = Comparator.comparingInt(s -> ((DirectDruidClient) s.getQueryRunner()).getNumOpenConnections()); + @Nullable @Override public QueryableDruidServer pick(Set servers, DataSegment segment) { diff --git a/server/src/main/java/org/apache/druid/client/selector/RandomServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/RandomServerSelectorStrategy.java index 4b6833140f2..ade23dd262a 100644 --- a/server/src/main/java/org/apache/druid/client/selector/RandomServerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/RandomServerSelectorStrategy.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.Set; @@ -31,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; public class RandomServerSelectorStrategy implements ServerSelectorStrategy { + @Nullable @Override public QueryableDruidServer pick(Set servers, DataSegment segment) { diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 242908d3350..db9ad7b98a1 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -21,6 +21,7 @@ package org.apache.druid.client.selector; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.apache.druid.client.DataSegmentInterner; +import org.apache.druid.query.Query; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; /** */ -public class ServerSelector implements DiscoverySelector, Overshadowable +public class ServerSelector implements Overshadowable { private final Int2ObjectRBTreeMap> historicalServers; @@ -160,14 +161,13 @@ public class ServerSelector implements DiscoverySelector, } @Nullable - @Override - public QueryableDruidServer pick() + public QueryableDruidServer pick(@Nullable Query query) { synchronized (this) { if (!historicalServers.isEmpty()) { - return strategy.pick(historicalServers, segment.get()); + return strategy.pick(query, historicalServers, segment.get()); } - return strategy.pick(realtimeServers, segment.get()); + return strategy.pick(query, realtimeServers, segment.get()); } } diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java index 79110d39a4a..587762068c5 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelectorStrategy.java @@ -21,8 +21,11 @@ package org.apache.druid.client.selector; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.Iterables; +import org.apache.druid.query.Query; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.List; import java.util.Set; @@ -33,7 +36,29 @@ import java.util.Set; }) public interface ServerSelectorStrategy { - QueryableDruidServer pick(Set servers, DataSegment segment); + @Nullable + default QueryableDruidServer pick(@Nullable Query query, Set servers, DataSegment segment) + { + return Iterables.getOnlyElement(pick(query, servers, segment, 1), null); + } + + default List pick(@Nullable Query query, Set servers, DataSegment segment, + int numServersToPick) + { + return pick(servers, segment, numServersToPick); + } + + @Deprecated + @Nullable + default QueryableDruidServer pick(Set servers, DataSegment segment) + { + return pick(null, servers, segment); + } + + @Deprecated + default List pick(Set servers, DataSegment segment, int numServersToPick) + { + return pick(null, servers, segment, numServersToPick); + } - List pick(Set servers, DataSegment segment, int numServersToPick); } diff --git a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java index 69987f89f42..e73ed2fabc0 100644 --- a/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/client/selector/TierSelectorStrategy.java @@ -22,9 +22,11 @@ package org.apache.druid.client.selector; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.query.Query; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; + import java.util.Comparator; import java.util.List; import java.util.Set; @@ -41,12 +43,36 @@ public interface TierSelectorStrategy { Comparator getComparator(); + @Deprecated @Nullable - QueryableDruidServer pick(Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment); + default QueryableDruidServer pick(Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment) + { + return pick(null, prioritizedServers, segment); + } - List pick( + @Deprecated + default List pick( Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment, - int numServersToPick - ); + int numServersToPick) + { + return pick(null, prioritizedServers, segment, numServersToPick); + } + + @Nullable + default QueryableDruidServer pick(@Nullable Query query, + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment) + { + return pick(prioritizedServers, segment); + } + + default List pick( + @Nullable Query query, + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment, + int numServersToPick) + { + return pick(prioritizedServers, segment, numServersToPick); + } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 30b93a18c84..7878e6afdc9 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -135,7 +135,7 @@ public class BrokerServerViewTest extends CuratorTestBase ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject(); Assert.assertFalse(selector.isEmpty()); Assert.assertEquals(segment, selector.getSegment()); - Assert.assertEquals(druidServer, selector.pick().getServer()); + Assert.assertEquals(druidServer, selector.pick(null).getServer()); unannounceSegmentForServer(druidServer, segment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -384,7 +384,7 @@ public class BrokerServerViewTest extends CuratorTestBase ServerSelector selector = ((SingleElementPartitionChunk) actualPartitionHolder.iterator() .next()).getObject(); Assert.assertFalse(selector.isEmpty()); - Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick().getServer()); + Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null).getServer()); Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment()); } } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java index a8fe344552e..7970c8c893c 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java @@ -332,7 +332,7 @@ public class CachingClusteredClientCacheKeyManagerTest extends EasyMockSupport 0 ); expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes(); - expect(serverSelector.pick()).andReturn(queryableDruidServer).anyTimes(); + expect(serverSelector.pick(query)).andReturn(queryableDruidServer).anyTimes(); expect(queryableDruidServer.getServer()).andReturn(server).anyTimes(); expect(serverSelector.getSegment()).andReturn(segment).anyTimes(); replay(serverSelector, queryableDruidServer, server); diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index c38bcd4ce9a..6322f730be9 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -223,7 +223,7 @@ public class DirectDruidClientTest Assert.assertEquals(2, client2.getNumOpenConnections()); - Assert.assertEquals(serverSelector.pick(), queryableDruidServer2); + Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2); EasyMock.verify(httpClient); } diff --git a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java index 066275fc88d..fa17aabb5c4 100644 --- a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java @@ -23,6 +23,7 @@ import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.Query; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -31,15 +32,37 @@ import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class TierSelectorStrategyTest { + + @Test + public void testHighestPriorityTierSelectorStrategyRealtime() + { + DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class); + QueryableDruidServer lowPriority = new QueryableDruidServer( + new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), + client + ); + QueryableDruidServer highPriority = new QueryableDruidServer( + new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 1), + client + ); + testTierSelectorStrategy( + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), + highPriority, lowPriority + ); + } + @Test public void testHighestPriorityTierSelectorStrategy() { @@ -168,19 +191,19 @@ public class TierSelectorStrategyTest new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 3), client ); - + TierSelectorStrategy tierSelectorStrategy = new CustomTierSelectorStrategy( + new ConnectionCountServerSelectorStrategy(), + new CustomTierSelectorStrategyConfig() + { + @Override + public List getPriorities() + { + return Arrays.asList(2, 0, -1); + } + } + ); testTierSelectorStrategy( - new CustomTierSelectorStrategy( - new ConnectionCountServerSelectorStrategy(), - new CustomTierSelectorStrategyConfig() - { - @Override - public List getPriorities() - { - return Arrays.asList(2, 0, -1); - } - } - ), + tierSelectorStrategy, p3, p1, p0, p4, p2 ); } @@ -216,9 +239,35 @@ public class TierSelectorStrategyTest serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment()); } - Assert.assertEquals(expectedSelection[0], serverSelector.pick()); + Assert.assertEquals(expectedSelection[0], serverSelector.pick(null)); + Assert.assertEquals(expectedSelection[0], serverSelector.pick(EasyMock.createMock(Query.class))); Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1)); Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2)); } + + @Test + public void testServerSelectorStrategyDefaults() + { + DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class); + QueryableDruidServer p0 = new QueryableDruidServer( + new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1), + client + ); + Set servers = new HashSet<>(); + servers.add(p0); + RandomServerSelectorStrategy strategy = new RandomServerSelectorStrategy(); + Assert.assertEquals(strategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0); + Assert.assertEquals(strategy.pick(EasyMock.createMock(Query.class), servers, EasyMock.createMock(DataSegment.class)), p0); + ServerSelectorStrategy defaultDeprecatedServerSelectorStrategy = new ServerSelectorStrategy() { + @Override + public List pick(@Nullable Query query, Set servers, DataSegment segment, + int numServersToPick) + { + return strategy.pick(servers, segment, numServersToPick); + } + }; + Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0); + Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class), 1).get(0), p0); + } }