diff --git a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java index 1c525267f7f..718b4d1a0e4 100644 --- a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java @@ -21,7 +21,6 @@ package io.druid.client.selector; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import com.google.common.primitives.Ints; import io.druid.timeline.DataSegment; import java.util.Collections; @@ -31,14 +30,8 @@ import java.util.Set; public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy { - private static final Comparator comparator = new Comparator() - { - @Override - public int compare(QueryableDruidServer left, QueryableDruidServer right) - { - return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections()); - } - }; + private static final Comparator comparator = + Comparator.comparingInt(s -> s.getClient().getNumOpenConnections()); @Override public QueryableDruidServer pick(Set servers, DataSegment segment) diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 48617c33200..1466035778c 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -20,23 +20,26 @@ package io.druid.client.selector; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import java.util.ArrayList; import javax.annotation.Nullable; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; /** */ public class ServerSelector implements DiscoverySelector { - private final Int2ObjectRBTreeMap> servers; + private final Int2ObjectRBTreeMap> historicalServers; + + private final Int2ObjectRBTreeMap> realtimeServers; private final TierSelectorStrategy strategy; @@ -49,7 +52,8 @@ public class ServerSelector implements DiscoverySelector { this.segment = new AtomicReference<>(segment); this.strategy = strategy; - this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); + this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); + this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); } public DataSegment getSegment() @@ -63,8 +67,18 @@ public class ServerSelector implements DiscoverySelector { synchronized (this) { this.segment.set(segment); - int priority = server.getServer().getPriority(); - Set priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>()); + Set priorityServers; + if (server.getServer().getType() == ServerType.HISTORICAL) { + priorityServers = historicalServers.computeIfAbsent( + server.getServer().getPriority(), + p -> new HashSet<>() + ); + } else { + priorityServers = realtimeServers.computeIfAbsent( + server.getServer().getPriority(), + p -> new HashSet<>() + ); + } priorityServers.add(server); } } @@ -72,12 +86,23 @@ public class ServerSelector implements DiscoverySelector public boolean removeServer(QueryableDruidServer server) { synchronized (this) { + Int2ObjectRBTreeMap> servers; + Set priorityServers; int priority = server.getServer().getPriority(); - Set priorityServers = servers.get(priority); + if (server.getServer().getType() == ServerType.HISTORICAL) { + servers = historicalServers; + priorityServers = historicalServers.get(priority); + } else { + servers = realtimeServers; + priorityServers = realtimeServers.get(priority); + } + if (priorityServers == null) { return false; } + boolean result = priorityServers.remove(server); + if (priorityServers.isEmpty()) { servers.remove(priority); } @@ -88,35 +113,61 @@ public class ServerSelector implements DiscoverySelector public boolean isEmpty() { synchronized (this) { - return servers.isEmpty(); + return historicalServers.isEmpty() && realtimeServers.isEmpty(); } } public List getCandidates(final int numCandidates) { + List candidates; synchronized (this) { if (numCandidates > 0) { - return strategy.pick(servers, segment.get(), numCandidates) - .stream() - .map(server -> server.getServer().getMetadata()) - .collect(Collectors.toList()); + candidates = new ArrayList<>(numCandidates); + strategy.pick(historicalServers, segment.get(), numCandidates) + .stream() + .map(server -> server.getServer().getMetadata()) + .forEach(candidates::add); + + if (candidates.size() < numCandidates) { + strategy.pick(realtimeServers, segment.get(), numCandidates - candidates.size()) + .stream() + .map(server -> server.getServer().getMetadata()) + .forEach(candidates::add); + } + return candidates; } else { - // return all servers as candidates - return servers.values() - .stream() - .flatMap(Collection::stream) - .map(server -> server.getServer().getMetadata()) - .collect(Collectors.toList()); + return getAllServers(); } } } + public List getAllServers() + { + List servers = new ArrayList<>(); + historicalServers.values() + .stream() + .flatMap(Collection::stream) + .map(server -> server.getServer().getMetadata()) + .forEach(servers::add); + + realtimeServers.values() + .stream() + .flatMap(Collection::stream) + .map(server -> server.getServer().getMetadata()) + .forEach(servers::add); + + return servers; + } + @Nullable @Override public QueryableDruidServer pick() { synchronized (this) { - return strategy.pick(servers, segment.get()); + if (!historicalServers.isEmpty()) { + return strategy.pick(historicalServers, segment.get()); + } + return strategy.pick(realtimeServers, segment.get()); } } } diff --git a/server/src/main/java/io/druid/server/DirectClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/DirectClientQuerySegmentWalker.java deleted file mode 100644 index 5f810c29a5c..00000000000 --- a/server/src/main/java/io/druid/server/DirectClientQuerySegmentWalker.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server; - -import io.druid.client.DirectDruidClient; -import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; -import io.druid.query.SegmentDescriptor; -import org.joda.time.Interval; - -/** - */ -public class DirectClientQuerySegmentWalker implements QuerySegmentWalker -{ - private final QueryToolChestWarehouse warehouse; - private final DirectDruidClient baseClient; - - public DirectClientQuerySegmentWalker( - QueryToolChestWarehouse warehouse, - DirectDruidClient baseClient - ) - { - this.warehouse = warehouse; - this.baseClient = baseClient; - } - - @Override - public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) - { - return makeRunner(query); - } - - @Override - public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) - { - return makeRunner(query); - } - - private FinalizeResultsQueryRunner makeRunner(final Query query) - { - return new FinalizeResultsQueryRunner(baseClient, warehouse.getToolChest(query)); - } -} diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 1024c3878a1..aa6245c536d 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -130,8 +130,7 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertTrue(actualPartitionHolder.isComplete()); Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); - ServerSelector selector = ((SingleElementPartitionChunk) actualPartitionHolder.iterator() - .next()).getObject(); + ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject(); Assert.assertFalse(selector.isEmpty()); Assert.assertEquals(segment, selector.getSegment()); Assert.assertEquals(druidServer, selector.pick().getServer()); diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 060dac6671c..f6e898e8d7d 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -153,7 +153,7 @@ public class DirectDruidClientTest ); QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( - new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), client1 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); diff --git a/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java b/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java index 56ec39f2add..a66564fa7bc 100644 --- a/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java +++ b/server/src/test/java/io/druid/client/selector/ServerSelectorTest.java @@ -32,6 +32,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.expect; + /** */ public class ServerSelectorTest @@ -42,7 +44,7 @@ public class ServerSelectorTest public void setUp() { tierSelectorStrategy = EasyMock.createMock(TierSelectorStrategy.class); - EasyMock.expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes(); + expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes(); } @Test @@ -53,7 +55,7 @@ public class ServerSelectorTest .dataSource("test_broker_server_view") .interval(Intervals.of("2012/2013")) .loadSpec( - ImmutableMap.of( + ImmutableMap.of( "type", "local", "path", @@ -61,13 +63,13 @@ public class ServerSelectorTest ) ) .version("v1") - .dimensions(ImmutableList.of()) - .metrics(ImmutableList.of()) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) .shardSpec(NoneShardSpec.instance()) .binaryVersion(9) .size(0) .build(), - EasyMock.createMock(TierSelectorStrategy.class) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) ); selector.addServerAndUpdateSegment( @@ -80,7 +82,7 @@ public class ServerSelectorTest "test_broker_server_view") .interval(Intervals.of("2012/2013")) .loadSpec( - ImmutableMap.of( + ImmutableMap.of( "type", "local", "path", @@ -89,13 +91,13 @@ public class ServerSelectorTest ) .version("v1") .dimensions( - ImmutableList.of( + ImmutableList.of( "a", "b", "c" )) .metrics( - ImmutableList.of()) + ImmutableList.of()) .shardSpec(NoneShardSpec.instance()) .binaryVersion(9) .size(0)