allow server selection to be aware of query (#10428)

* add query through to server selector

* add nullable extensions, deprecate old methods with defaults

* style changes

* add nullable to ServerSelectorStrategy

* fix test coverage

* missing override in test

* add null check
This commit is contained in:
keefe roedersheimer 2020-12-18 16:56:19 -05:00 committed by GitHub
parent 6f2ce8f0a5
commit ca3b925133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 143 additions and 35 deletions

View File

@ -588,7 +588,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
{ {
final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>(); final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>();
for (SegmentServerSelector segmentServer : segments) { for (SegmentServerSelector segmentServer : segments) {
final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(); final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(query);
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
log.makeAlert( log.makeAlert(
@ -812,7 +812,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
Hasher hasher = Hashing.sha1().newHasher(); Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true; boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) { for (SegmentServerSelector p : segments) {
if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) { QueryableDruidServer queryableServer = p.getServer().pick(query);
if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) {
hasOnlyHistoricalSegments = false; hasOnlyHistoricalSegments = false;
break; break;
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.client.selector;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -41,16 +42,18 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate
@Nullable @Nullable
@Override @Override
public QueryableDruidServer pick( public <T> QueryableDruidServer pick(
Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment DataSegment segment
) )
{ {
return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null); return Iterables.getOnlyElement(pick(query, prioritizedServers, segment, 1), null);
} }
@Override @Override
public List<QueryableDruidServer> pick( public <T> List<QueryableDruidServer> pick(
Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment, DataSegment segment,
int numServersToPick int numServersToPick
@ -58,7 +61,7 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate
{ {
List<QueryableDruidServer> result = new ArrayList<>(numServersToPick); List<QueryableDruidServer> result = new ArrayList<>(numServersToPick);
for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) { for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size())); result.addAll(serverSelectorStrategy.pick(query, priorityServers, segment, numServersToPick - result.size()));
if (result.size() == numServersToPick) { if (result.size() == numServersToPick) {
break; break;
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Ordering;
import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -34,6 +35,7 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
private static final Comparator<QueryableDruidServer> COMPARATOR = private static final Comparator<QueryableDruidServer> COMPARATOR =
Comparator.comparingInt(s -> ((DirectDruidClient) s.getQueryRunner()).getNumOpenConnections()); Comparator.comparingInt(s -> ((DirectDruidClient) s.getQueryRunner()).getNumOpenConnections());
@Nullable
@Override @Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment) public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{ {

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -31,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{ {
@Nullable
@Override @Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment) public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{ {

View File

@ -21,6 +21,7 @@ package org.apache.druid.client.selector;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.query.Query;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>, Overshadowable<ServerSelector> public class ServerSelector implements Overshadowable<ServerSelector>
{ {
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> historicalServers; private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> historicalServers;
@ -160,14 +161,13 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>,
} }
@Nullable @Nullable
@Override public <T> QueryableDruidServer pick(@Nullable Query<T> query)
public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
if (!historicalServers.isEmpty()) { if (!historicalServers.isEmpty()) {
return strategy.pick(historicalServers, segment.get()); return strategy.pick(query, historicalServers, segment.get());
} }
return strategy.pick(realtimeServers, segment.get()); return strategy.pick(query, realtimeServers, segment.get());
} }
} }

View File

@ -21,8 +21,11 @@ package org.apache.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Iterables;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -33,7 +36,29 @@ import java.util.Set;
}) })
public interface ServerSelectorStrategy public interface ServerSelectorStrategy
{ {
QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment); @Nullable
default <T> QueryableDruidServer pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment)
{
return Iterables.getOnlyElement(pick(query, servers, segment, 1), null);
}
default <T> List<QueryableDruidServer> pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment,
int numServersToPick)
{
return pick(servers, segment, numServersToPick);
}
@Deprecated
@Nullable
default QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
return pick(null, servers, segment);
}
@Deprecated
default List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick)
{
return pick(null, servers, segment, numServersToPick);
}
List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick);
} }

View File

@ -22,9 +22,11 @@ package org.apache.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -41,12 +43,36 @@ public interface TierSelectorStrategy
{ {
Comparator<Integer> getComparator(); Comparator<Integer> getComparator();
@Deprecated
@Nullable @Nullable
QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment); default QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment)
{
return pick(null, prioritizedServers, segment);
}
List<QueryableDruidServer> pick( @Deprecated
default List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment, DataSegment segment,
int numServersToPick int numServersToPick)
); {
return pick(null, prioritizedServers, segment, numServersToPick);
}
@Nullable
default <T> QueryableDruidServer pick(@Nullable Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment)
{
return pick(prioritizedServers, segment);
}
default <T> List<QueryableDruidServer> pick(
@Nullable Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick)
{
return pick(prioritizedServers, segment, numServersToPick);
}
} }

View File

@ -135,7 +135,7 @@ public class BrokerServerViewTest extends CuratorTestBase
ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject(); ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty()); Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(segment, selector.getSegment());
Assert.assertEquals(druidServer, selector.pick().getServer()); Assert.assertEquals(druidServer, selector.pick(null).getServer());
unannounceSegmentForServer(druidServer, segment, zkPathsConfig); unannounceSegmentForServer(druidServer, segment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@ -384,7 +384,7 @@ public class BrokerServerViewTest extends CuratorTestBase
ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator() ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
.next()).getObject(); .next()).getObject();
Assert.assertFalse(selector.isEmpty()); Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick().getServer()); Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null).getServer());
Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment()); Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment());
} }
} }

View File

@ -332,7 +332,7 @@ public class CachingClusteredClientCacheKeyManagerTest extends EasyMockSupport
0 0
); );
expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes(); expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes();
expect(serverSelector.pick()).andReturn(queryableDruidServer).anyTimes(); expect(serverSelector.pick(query)).andReturn(queryableDruidServer).anyTimes();
expect(queryableDruidServer.getServer()).andReturn(server).anyTimes(); expect(queryableDruidServer.getServer()).andReturn(server).anyTimes();
expect(serverSelector.getSegment()).andReturn(segment).anyTimes(); expect(serverSelector.getSegment()).andReturn(segment).anyTimes();
replay(serverSelector, queryableDruidServer, server); replay(serverSelector, queryableDruidServer, server);

View File

@ -223,7 +223,7 @@ public class DirectDruidClientTest
Assert.assertEquals(2, client2.getNumOpenConnections()); Assert.assertEquals(2, client2.getNumOpenConnections());
Assert.assertEquals(serverSelector.pick(), queryableDruidServer2); Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2);
EasyMock.verify(httpClient); EasyMock.verify(httpClient);
} }

View File

@ -23,6 +23,7 @@ import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Query;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -31,15 +32,37 @@ import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
public class TierSelectorStrategyTest public class TierSelectorStrategyTest
{ {
@Test
public void testHighestPriorityTierSelectorStrategyRealtime()
{
DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
QueryableDruidServer lowPriority = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
client
);
QueryableDruidServer highPriority = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 1),
client
);
testTierSelectorStrategy(
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()),
highPriority, lowPriority
);
}
@Test @Test
public void testHighestPriorityTierSelectorStrategy() public void testHighestPriorityTierSelectorStrategy()
{ {
@ -168,19 +191,19 @@ public class TierSelectorStrategyTest
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 3), new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 3),
client client
); );
TierSelectorStrategy tierSelectorStrategy = new CustomTierSelectorStrategy(
new ConnectionCountServerSelectorStrategy(),
new CustomTierSelectorStrategyConfig()
{
@Override
public List<Integer> getPriorities()
{
return Arrays.asList(2, 0, -1);
}
}
);
testTierSelectorStrategy( testTierSelectorStrategy(
new CustomTierSelectorStrategy( tierSelectorStrategy,
new ConnectionCountServerSelectorStrategy(),
new CustomTierSelectorStrategyConfig()
{
@Override
public List<Integer> getPriorities()
{
return Arrays.asList(2, 0, -1);
}
}
),
p3, p1, p0, p4, p2 p3, p1, p0, p4, p2
); );
} }
@ -216,9 +239,35 @@ public class TierSelectorStrategyTest
serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment()); serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment());
} }
Assert.assertEquals(expectedSelection[0], serverSelector.pick()); Assert.assertEquals(expectedSelection[0], serverSelector.pick(null));
Assert.assertEquals(expectedSelection[0], serverSelector.pick(EasyMock.createMock(Query.class)));
Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1)); Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1));
Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2)); Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2));
} }
@Test
public void testServerSelectorStrategyDefaults()
{
DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
QueryableDruidServer p0 = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1),
client
);
Set<QueryableDruidServer> servers = new HashSet<>();
servers.add(p0);
RandomServerSelectorStrategy strategy = new RandomServerSelectorStrategy();
Assert.assertEquals(strategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0);
Assert.assertEquals(strategy.pick(EasyMock.createMock(Query.class), servers, EasyMock.createMock(DataSegment.class)), p0);
ServerSelectorStrategy defaultDeprecatedServerSelectorStrategy = new ServerSelectorStrategy() {
@Override
public <T> List<QueryableDruidServer> pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment,
int numServersToPick)
{
return strategy.pick(servers, segment, numServersToPick);
}
};
Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0);
Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class), 1).get(0), p0);
}
} }