diff --git a/server/src/main/java/io/druid/client/selector/AbstractTierSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/AbstractTierSelectorStrategy.java index 8de7dfa6ecc..60fcac019dc 100644 --- a/server/src/main/java/io/druid/client/selector/AbstractTierSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/AbstractTierSelectorStrategy.java @@ -19,12 +19,13 @@ package io.druid.client.selector; -import io.druid.java.util.common.ISE; +import com.google.common.collect.Iterables; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.Set; -import java.util.TreeMap; /** */ @@ -39,23 +40,27 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate @Override public QueryableDruidServer pick( - TreeMap> prioritizedServers, DataSegment segment + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment ) { - final Map.Entry> priorityServers = prioritizedServers.pollFirstEntry(); + return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null); + } - if (priorityServers == null) { - return null; - } - - final Set servers = priorityServers.getValue(); - switch (servers.size()) { - case 0: - throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier()); - case 1: - return priorityServers.getValue().iterator().next(); - default: - return serverSelectorStrategy.pick(servers, segment); + @Override + public List pick( + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment, + int numServersToPick + ) + { + List result = new ArrayList<>(numServersToPick); + for (Set priorityServers : prioritizedServers.values()) { + result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size())); + if (result.size() == numServersToPick) { + break; + } } + return result; } } diff --git a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java index 8fdf6847938..1c525267f7f 100644 --- a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java @@ -19,11 +19,14 @@ package io.druid.client.selector; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; import io.druid.timeline.DataSegment; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Set; public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy @@ -42,4 +45,15 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra { return Collections.min(servers, comparator); } + + @Override + public List pick( + Set servers, DataSegment segment, int numServersToPick + ) + { + if (servers.size() <= numServersToPick) { + return ImmutableList.copyOf(servers); + } + return Ordering.from(comparator).leastOf(servers, numServersToPick); + } } diff --git a/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java index 91360e93b0d..439c6894594 100644 --- a/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java @@ -19,19 +19,32 @@ package io.druid.client.selector; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import io.druid.timeline.DataSegment; -import java.util.Random; +import java.util.Collections; +import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; public class RandomServerSelectorStrategy implements ServerSelectorStrategy { - private static final Random random = new Random(); - @Override public QueryableDruidServer pick(Set servers, DataSegment segment) { - return Iterators.get(servers.iterator(), random.nextInt(servers.size())); + return Iterators.get(servers.iterator(), ThreadLocalRandom.current().nextInt(servers.size())); + } + + @Override + public List pick(Set servers, DataSegment segment, int numServersToPick) + { + if (servers.size() <= numServersToPick) { + return ImmutableList.copyOf(servers); + } + List list = Lists.newArrayList(servers); + Collections.shuffle(list, ThreadLocalRandom.current()); + return ImmutableList.copyOf(list.subList(0, numServersToPick)); } } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index a640fe59652..d04b66bf1b2 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -19,27 +19,23 @@ package io.druid.client.selector; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.metamx.emitter.EmittingLogger; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** */ public class ServerSelector implements DiscoverySelector { - private static final EmittingLogger log = new EmittingLogger(ServerSelector.class); - - private final Set servers = Sets.newHashSet(); + private final Int2ObjectRBTreeMap> servers; private final TierSelectorStrategy strategy; @@ -50,8 +46,9 @@ public class ServerSelector implements DiscoverySelector TierSelectorStrategy strategy ) { - this.segment = new AtomicReference(segment); + this.segment = new AtomicReference<>(segment); this.strategy = strategy; + this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); } public DataSegment getSegment() @@ -65,14 +62,25 @@ public class ServerSelector implements DiscoverySelector { synchronized (this) { this.segment.set(segment); - servers.add(server); + int priority = server.getServer().getPriority(); + Set priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>()); + priorityServers.add(server); } } public boolean removeServer(QueryableDruidServer server) { synchronized (this) { - return servers.remove(server); + int priority = server.getServer().getPriority(); + Set priorityServers = servers.get(priority); + if (priorityServers == null) { + return false; + } + boolean result = priorityServers.remove(server); + if (priorityServers.isEmpty()) { + servers.remove(priority); + } + return result; } } @@ -84,49 +92,28 @@ public class ServerSelector implements DiscoverySelector } public List getCandidates(final int numCandidates) { - List result = Lists.newArrayList(); synchronized (this) { - final DataSegment target = segment.get(); - for (Map.Entry> entry : toPrioritizedServers().entrySet()) { - Set servers = entry.getValue(); - TreeMap> tieredMap = Maps.newTreeMap(); - while (!servers.isEmpty()) { - tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry - QueryableDruidServer server = strategy.pick(tieredMap, target); - if (server == null) { - // regard this as any server in tieredMap is not appropriate - break; - } - result.add(server.getServer().getMetadata()); - if (numCandidates > 0 && result.size() >= numCandidates) { - return result; - } - servers.remove(server); - } + if (numCandidates > 0) { + return strategy.pick(servers, segment.get(), numCandidates) + .stream() + .map(server -> server.getServer().getMetadata()) + .collect(Collectors.toList()); + } else { + // return all servers as candidates + return servers.values() + .stream() + .flatMap(Collection::stream) + .map(server -> server.getServer().getMetadata()) + .collect(Collectors.toList()); } } - return result; } @Override public QueryableDruidServer pick() { synchronized (this) { - return strategy.pick(toPrioritizedServers(), segment.get()); + return strategy.pick(servers, segment.get()); } } - - private TreeMap> toPrioritizedServers() - { - final TreeMap> prioritizedServers = new TreeMap<>(strategy.getComparator()); - for (QueryableDruidServer server : servers) { - Set theServers = prioritizedServers.get(server.getServer().getPriority()); - if (theServers == null) { - theServers = Sets.newHashSet(); - prioritizedServers.put(server.getServer().getPriority(), theServers); - } - theServers.add(server); - } - return prioritizedServers; - } } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java index 9caf7179f3d..506ead7787a 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.timeline.DataSegment; +import java.util.List; import java.util.Set; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @@ -32,5 +33,7 @@ import java.util.Set; }) public interface ServerSelectorStrategy { - public QueryableDruidServer pick(Set servers, DataSegment segment); + QueryableDruidServer pick(Set servers, DataSegment segment); + + List pick(Set servers, DataSegment segment, int numServersToPick); } diff --git a/server/src/main/java/io/druid/client/selector/TierSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/TierSelectorStrategy.java index 47a9910b8a8..f433fc266b7 100644 --- a/server/src/main/java/io/druid/client/selector/TierSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/TierSelectorStrategy.java @@ -22,10 +22,11 @@ package io.druid.client.selector; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import java.util.Comparator; +import java.util.List; import java.util.Set; -import java.util.TreeMap; /** */ @@ -37,7 +38,13 @@ import java.util.TreeMap; }) public interface TierSelectorStrategy { - public Comparator getComparator(); + Comparator getComparator(); - public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment); + QueryableDruidServer pick(Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment); + + List pick( + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment, + int numServersToPick + ); } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index bd46eeeca26..6aa88d4b6ed 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -40,6 +40,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.SingleElementPartitionChunk; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; @@ -47,17 +48,18 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Executor; /** */ -public class CachingClusteredClientFunctionalityTest { +public class CachingClusteredClientFunctionalityTest +{ public CachingClusteredClient client; @@ -75,17 +77,22 @@ public class CachingClusteredClientFunctionalityTest { } @Test - public void testUncoveredInterval() throws Exception { + public void testUncoveredInterval() throws Exception + { addToTimeline(new Interval("2015-01-02/2015-01-03"), "1"); addToTimeline(new Interval("2015-01-04/2015-01-05"), "1"); addToTimeline(new Interval("2015-02-04/2015-02-05"), "1"); final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource("test") - .intervals("2015-01-02/2015-01-03") - .granularity("day") - .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) - .context(ImmutableMap.of("uncoveredIntervalsLimit", 3)); + .dataSource("test") + .intervals("2015-01-02/2015-01-03") + .granularity("day") + .aggregators(Arrays.asList(new CountAggregatorFactory( + "rows"))) + .context(ImmutableMap.of( + "uncoveredIntervalsLimit", + 3 + )); Map responseContext = new HashMap<>(); client.run(builder.build(), responseContext); @@ -132,7 +139,8 @@ public class CachingClusteredClientFunctionalityTest { assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04"); } - private void assertUncovered(Map context, boolean uncoveredIntervalsOverflowed, String... intervals) { + private void assertUncovered(Map context, boolean uncoveredIntervalsOverflowed, String... intervals) + { List expectedList = Lists.newArrayListWithExpectedSize(intervals.length); for (String interval : intervals) { expectedList.add(new Interval(interval)); @@ -141,28 +149,49 @@ public class CachingClusteredClientFunctionalityTest { Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed")); } - private void addToTimeline(Interval interval, String version) { + private void addToTimeline(Interval interval, String version) + { timeline.add(interval, version, new SingleElementPartitionChunk<>( new ServerSelector( DataSegment.builder() - .dataSource("test") - .interval(interval) - .version(version) - .shardSpec(NoneShardSpec.instance()) - .build(), - new TierSelectorStrategy() { + .dataSource("test") + .interval(interval) + .version(version) + .shardSpec(NoneShardSpec.instance()) + .build(), + new TierSelectorStrategy() + { @Override - public Comparator getComparator() { + public Comparator getComparator() + { return Ordering.natural(); } @Override - public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment) { + public QueryableDruidServer pick( + Int2ObjectRBTreeMap> prioritizedServers, DataSegment segment + ) + { return new QueryableDruidServer( new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10), EasyMock.createNiceMock(DirectDruidClient.class) ); } + + @Override + public List pick( + Int2ObjectRBTreeMap> prioritizedServers, + DataSegment segment, + int numServersToPick + ) + { + return Collections.singletonList( + new QueryableDruidServer( + new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10), + EasyMock.createNiceMock(DirectDruidClient.class) + ) + ); + } } ) )); diff --git a/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java b/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java index 0dc42790694..ffad1bd7fac 100644 --- a/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java +++ b/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java @@ -21,17 +21,29 @@ package io.druid.client.selector; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.druid.client.DirectDruidClient; +import io.druid.client.DruidServer; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** */ public class ServerSelectorTest { + TierSelectorStrategy tierSelectorStrategy; + + @Before + public void setUp() throws Exception + { + tierSelectorStrategy = EasyMock.createMock(TierSelectorStrategy.class); + EasyMock.expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes(); + } @Test public void testSegmentUpdate() throws Exception @@ -59,7 +71,10 @@ public class ServerSelectorTest ); selector.addServerAndUpdateSegment( - EasyMock.createMock(QueryableDruidServer.class), + new QueryableDruidServer( + new DruidServer("test1", "localhost", 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 1), + EasyMock.createMock(DirectDruidClient.class) + ), DataSegment.builder() .dataSource( "test_broker_server_view") diff --git a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java index c379c3f7078..6818f625c12 100644 --- a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java @@ -43,6 +43,8 @@ import com.google.common.collect.Ordering; import io.druid.client.DruidServer; import io.druid.client.FilteredServerInventoryView; import io.druid.client.TimelineServerView; +import io.druid.client.selector.HighestPriorityTierSelectorStrategy; +import io.druid.client.selector.RandomServerSelectorStrategy; import io.druid.client.selector.ServerSelector; import io.druid.query.TableDataSource; import io.druid.query.metadata.SegmentMetadataQueryConfig; @@ -378,7 +380,7 @@ public class ClientInfoResourceTest .size(1) .build(); server.addDataSegment(segment.getIdentifier(), segment); - ServerSelector ss = new ServerSelector(segment, null); + ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())); timeline.add(new Interval(interval), version, new SingleElementPartitionChunk(ss)); } @@ -402,7 +404,7 @@ public class ClientInfoResourceTest .size(1) .build(); server.addDataSegment(segment.getIdentifier(), segment); - ServerSelector ss = new ServerSelector(segment, null); + ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())); timeline.add(new Interval(interval), version, shardSpec.createChunk(ss)); }