Server selector improvement (#4315)

* Do not re-create prioritized servers on each call in server selector and extend TierSelectorStrategy interface with a method to pick multiple elements at once

* Fix compilation
This commit is contained in:
dgolitsyn 2017-05-26 20:02:09 +04:00 committed by Roman Leventov
parent 1eaa7887bd
commit 515fabce96
9 changed files with 167 additions and 92 deletions

View File

@ -19,12 +19,13 @@
package io.druid.client.selector; package io.druid.client.selector;
import io.druid.java.util.common.ISE; import com.google.common.collect.Iterables;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import java.util.Map; import java.util.ArrayList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
/** /**
*/ */
@ -39,23 +40,27 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate
@Override @Override
public QueryableDruidServer pick( public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment
) )
{ {
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = prioritizedServers.pollFirstEntry(); return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null);
}
if (priorityServers == null) { @Override
return null; public List<QueryableDruidServer> pick(
} Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
final Set<QueryableDruidServer> servers = priorityServers.getValue(); int numServersToPick
switch (servers.size()) { )
case 0: {
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier()); List<QueryableDruidServer> result = new ArrayList<>(numServersToPick);
case 1: for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
return priorityServers.getValue().iterator().next(); result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size()));
default: if (result.size() == numServersToPick) {
return serverSelectorStrategy.pick(servers, segment); break;
}
} }
return result;
} }
} }

View File

@ -19,11 +19,14 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import java.util.Set; import java.util.Set;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
@ -42,4 +45,15 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
{ {
return Collections.min(servers, comparator); return Collections.min(servers, comparator);
} }
@Override
public List<QueryableDruidServer> pick(
Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick
)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
return Ordering.from(comparator).leastOf(servers, numServersToPick);
}
} }

View File

@ -19,19 +19,32 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.Random; import java.util.Collections;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{ {
private static final Random random = new Random();
@Override @Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment) public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{ {
return Iterators.get(servers.iterator(), random.nextInt(servers.size())); return Iterators.get(servers.iterator(), ThreadLocalRandom.current().nextInt(servers.size()));
}
@Override
public List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
List<QueryableDruidServer> list = Lists.newArrayList(servers);
Collections.shuffle(list, ThreadLocalRandom.current());
return ImmutableList.copyOf(list.subList(0, numServersToPick));
} }
} }

View File

@ -19,27 +19,23 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/** /**
*/ */
public class ServerSelector implements DiscoverySelector<QueryableDruidServer> public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{ {
private static final EmittingLogger log = new EmittingLogger(ServerSelector.class); private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers;
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final TierSelectorStrategy strategy; private final TierSelectorStrategy strategy;
@ -50,8 +46,9 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
TierSelectorStrategy strategy TierSelectorStrategy strategy
) )
{ {
this.segment = new AtomicReference<DataSegment>(segment); this.segment = new AtomicReference<>(segment);
this.strategy = strategy; this.strategy = strategy;
this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
} }
public DataSegment getSegment() public DataSegment getSegment()
@ -65,14 +62,25 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{ {
synchronized (this) { synchronized (this) {
this.segment.set(segment); this.segment.set(segment);
servers.add(server); int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>());
priorityServers.add(server);
} }
} }
public boolean removeServer(QueryableDruidServer server) public boolean removeServer(QueryableDruidServer server)
{ {
synchronized (this) { synchronized (this) {
return servers.remove(server); int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.get(priority);
if (priorityServers == null) {
return false;
}
boolean result = priorityServers.remove(server);
if (priorityServers.isEmpty()) {
servers.remove(priority);
}
return result;
} }
} }
@ -84,49 +92,28 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
} }
public List<DruidServerMetadata> getCandidates(final int numCandidates) { public List<DruidServerMetadata> getCandidates(final int numCandidates) {
List<DruidServerMetadata> result = Lists.newArrayList();
synchronized (this) { synchronized (this) {
final DataSegment target = segment.get(); if (numCandidates > 0) {
for (Map.Entry<Integer, Set<QueryableDruidServer>> entry : toPrioritizedServers().entrySet()) { return strategy.pick(servers, segment.get(), numCandidates)
Set<QueryableDruidServer> servers = entry.getValue(); .stream()
TreeMap<Integer, Set<QueryableDruidServer>> tieredMap = Maps.newTreeMap(); .map(server -> server.getServer().getMetadata())
while (!servers.isEmpty()) { .collect(Collectors.toList());
tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry } else {
QueryableDruidServer server = strategy.pick(tieredMap, target); // return all servers as candidates
if (server == null) { return servers.values()
// regard this as any server in tieredMap is not appropriate .stream()
break; .flatMap(Collection::stream)
} .map(server -> server.getServer().getMetadata())
result.add(server.getServer().getMetadata()); .collect(Collectors.toList());
if (numCandidates > 0 && result.size() >= numCandidates) {
return result;
}
servers.remove(server);
}
} }
} }
return result;
} }
@Override @Override
public QueryableDruidServer pick() public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
return strategy.pick(toPrioritizedServers(), segment.get()); return strategy.pick(servers, segment.get());
} }
} }
private TreeMap<Integer, Set<QueryableDruidServer>> toPrioritizedServers()
{
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
}
theServers.add(server);
}
return prioritizedServers;
}
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set; import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@ -32,5 +33,7 @@ import java.util.Set;
}) })
public interface ServerSelectorStrategy public interface ServerSelectorStrategy
{ {
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment); QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick);
} }

View File

@ -22,10 +22,11 @@ package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
/** /**
*/ */
@ -37,7 +38,13 @@ import java.util.TreeMap;
}) })
public interface TierSelectorStrategy public interface TierSelectorStrategy
{ {
public Comparator<Integer> getComparator(); Comparator<Integer> getComparator();
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment); QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
);
} }

View File

@ -40,6 +40,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk; import io.druid.timeline.partition.SingleElementPartitionChunk;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
@ -47,17 +48,18 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
*/ */
public class CachingClusteredClientFunctionalityTest { public class CachingClusteredClientFunctionalityTest
{
public CachingClusteredClient client; public CachingClusteredClient client;
@ -75,17 +77,22 @@ public class CachingClusteredClientFunctionalityTest {
} }
@Test @Test
public void testUncoveredInterval() throws Exception { public void testUncoveredInterval() throws Exception
{
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
addToTimeline(new Interval("2015-02-04/2015-02-05"), "1"); addToTimeline(new Interval("2015-02-04/2015-02-05"), "1");
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource("test") .dataSource("test")
.intervals("2015-01-02/2015-01-03") .intervals("2015-01-02/2015-01-03")
.granularity("day") .granularity("day")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows"))) .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory(
.context(ImmutableMap.<String, Object>of("uncoveredIntervalsLimit", 3)); "rows")))
.context(ImmutableMap.<String, Object>of(
"uncoveredIntervalsLimit",
3
));
Map<String, Object> responseContext = new HashMap<>(); Map<String, Object> responseContext = new HashMap<>();
client.run(builder.build(), responseContext); client.run(builder.build(), responseContext);
@ -132,7 +139,8 @@ public class CachingClusteredClientFunctionalityTest {
assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04"); assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04");
} }
private void assertUncovered(Map<String, Object> context, boolean uncoveredIntervalsOverflowed, String... intervals) { private void assertUncovered(Map<String, Object> context, boolean uncoveredIntervalsOverflowed, String... intervals)
{
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length); List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
for (String interval : intervals) { for (String interval : intervals) {
expectedList.add(new Interval(interval)); expectedList.add(new Interval(interval));
@ -141,28 +149,49 @@ public class CachingClusteredClientFunctionalityTest {
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed")); Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed"));
} }
private void addToTimeline(Interval interval, String version) { private void addToTimeline(Interval interval, String version)
{
timeline.add(interval, version, new SingleElementPartitionChunk<>( timeline.add(interval, version, new SingleElementPartitionChunk<>(
new ServerSelector( new ServerSelector(
DataSegment.builder() DataSegment.builder()
.dataSource("test") .dataSource("test")
.interval(interval) .interval(interval)
.version(version) .version(version)
.shardSpec(NoneShardSpec.instance()) .shardSpec(NoneShardSpec.instance())
.build(), .build(),
new TierSelectorStrategy() { new TierSelectorStrategy()
{
@Override @Override
public Comparator<Integer> getComparator() { public Comparator<Integer> getComparator()
{
return Ordering.natural(); return Ordering.natural();
} }
@Override @Override
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment) { public QueryableDruidServer pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
)
{
return new QueryableDruidServer( return new QueryableDruidServer(
new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10), new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10),
EasyMock.createNiceMock(DirectDruidClient.class) EasyMock.createNiceMock(DirectDruidClient.class)
); );
} }
@Override
public List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
)
{
return Collections.singletonList(
new QueryableDruidServer(
new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10),
EasyMock.createNiceMock(DirectDruidClient.class)
)
);
}
} }
) )
)); ));

View File

@ -21,17 +21,29 @@ package io.druid.client.selector;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.client.DirectDruidClient;
import io.druid.client.DruidServer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
*/ */
public class ServerSelectorTest public class ServerSelectorTest
{ {
TierSelectorStrategy tierSelectorStrategy;
@Before
public void setUp() throws Exception
{
tierSelectorStrategy = EasyMock.createMock(TierSelectorStrategy.class);
EasyMock.expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes();
}
@Test @Test
public void testSegmentUpdate() throws Exception public void testSegmentUpdate() throws Exception
@ -59,7 +71,10 @@ public class ServerSelectorTest
); );
selector.addServerAndUpdateSegment( selector.addServerAndUpdateSegment(
EasyMock.createMock(QueryableDruidServer.class), new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 1),
EasyMock.createMock(DirectDruidClient.class)
),
DataSegment.builder() DataSegment.builder()
.dataSource( .dataSource(
"test_broker_server_view") "test_broker_server_view")

View File

@ -43,6 +43,8 @@ import com.google.common.collect.Ordering;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.FilteredServerInventoryView; import io.druid.client.FilteredServerInventoryView;
import io.druid.client.TimelineServerView; import io.druid.client.TimelineServerView;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.RandomServerSelectorStrategy;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.query.TableDataSource; import io.druid.query.TableDataSource;
import io.druid.query.metadata.SegmentMetadataQueryConfig; import io.druid.query.metadata.SegmentMetadataQueryConfig;
@ -378,7 +380,7 @@ public class ClientInfoResourceTest
.size(1) .size(1)
.build(); .build();
server.addDataSegment(segment.getIdentifier(), segment); server.addDataSegment(segment.getIdentifier(), segment);
ServerSelector ss = new ServerSelector(segment, null); ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()));
timeline.add(new Interval(interval), version, new SingleElementPartitionChunk<ServerSelector>(ss)); timeline.add(new Interval(interval), version, new SingleElementPartitionChunk<ServerSelector>(ss));
} }
@ -402,7 +404,7 @@ public class ClientInfoResourceTest
.size(1) .size(1)
.build(); .build();
server.addDataSegment(segment.getIdentifier(), segment); server.addDataSegment(segment.getIdentifier(), segment);
ServerSelector ss = new ServerSelector(segment, null); ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()));
timeline.add(new Interval(interval), version, shardSpec.createChunk(ss)); timeline.add(new Interval(interval), version, shardSpec.createChunk(ss));
} }