mirror of https://github.com/apache/druid.git
Use historical node instead of realtime for querying (#4764)
* Use historical node instead of realtime for querying * Incorporated code review comments * Incorporate code review comments * Remove artifact comment * Consider non-historical nodes as realtime
This commit is contained in:
parent
0eae89170e
commit
0982472c90
|
@ -21,7 +21,6 @@ package io.druid.client.selector;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -31,14 +30,8 @@ import java.util.Set;
|
|||
|
||||
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
|
||||
{
|
||||
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
|
||||
{
|
||||
@Override
|
||||
public int compare(QueryableDruidServer left, QueryableDruidServer right)
|
||||
{
|
||||
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
|
||||
}
|
||||
};
|
||||
private static final Comparator<QueryableDruidServer> comparator =
|
||||
Comparator.comparingInt(s -> s.getClient().getNumOpenConnections());
|
||||
|
||||
@Override
|
||||
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
|
||||
|
|
|
@ -20,23 +20,26 @@
|
|||
package io.druid.client.selector;
|
||||
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.coordination.ServerType;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
||||
{
|
||||
|
||||
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers;
|
||||
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> historicalServers;
|
||||
|
||||
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> realtimeServers;
|
||||
|
||||
private final TierSelectorStrategy strategy;
|
||||
|
||||
|
@ -49,7 +52,8 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
{
|
||||
this.segment = new AtomicReference<>(segment);
|
||||
this.strategy = strategy;
|
||||
this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
|
||||
this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
|
||||
this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
|
||||
}
|
||||
|
||||
public DataSegment getSegment()
|
||||
|
@ -63,8 +67,18 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
{
|
||||
synchronized (this) {
|
||||
this.segment.set(segment);
|
||||
int priority = server.getServer().getPriority();
|
||||
Set<QueryableDruidServer> priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>());
|
||||
Set<QueryableDruidServer> priorityServers;
|
||||
if (server.getServer().getType() == ServerType.HISTORICAL) {
|
||||
priorityServers = historicalServers.computeIfAbsent(
|
||||
server.getServer().getPriority(),
|
||||
p -> new HashSet<>()
|
||||
);
|
||||
} else {
|
||||
priorityServers = realtimeServers.computeIfAbsent(
|
||||
server.getServer().getPriority(),
|
||||
p -> new HashSet<>()
|
||||
);
|
||||
}
|
||||
priorityServers.add(server);
|
||||
}
|
||||
}
|
||||
|
@ -72,12 +86,23 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
public boolean removeServer(QueryableDruidServer server)
|
||||
{
|
||||
synchronized (this) {
|
||||
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers;
|
||||
Set<QueryableDruidServer> priorityServers;
|
||||
int priority = server.getServer().getPriority();
|
||||
Set<QueryableDruidServer> priorityServers = servers.get(priority);
|
||||
if (server.getServer().getType() == ServerType.HISTORICAL) {
|
||||
servers = historicalServers;
|
||||
priorityServers = historicalServers.get(priority);
|
||||
} else {
|
||||
servers = realtimeServers;
|
||||
priorityServers = realtimeServers.get(priority);
|
||||
}
|
||||
|
||||
if (priorityServers == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean result = priorityServers.remove(server);
|
||||
|
||||
if (priorityServers.isEmpty()) {
|
||||
servers.remove(priority);
|
||||
}
|
||||
|
@ -88,35 +113,61 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
public boolean isEmpty()
|
||||
{
|
||||
synchronized (this) {
|
||||
return servers.isEmpty();
|
||||
return historicalServers.isEmpty() && realtimeServers.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
public List<DruidServerMetadata> getCandidates(final int numCandidates)
|
||||
{
|
||||
List<DruidServerMetadata> candidates;
|
||||
synchronized (this) {
|
||||
if (numCandidates > 0) {
|
||||
return strategy.pick(servers, segment.get(), numCandidates)
|
||||
.stream()
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.collect(Collectors.toList());
|
||||
candidates = new ArrayList<>(numCandidates);
|
||||
strategy.pick(historicalServers, segment.get(), numCandidates)
|
||||
.stream()
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.forEach(candidates::add);
|
||||
|
||||
if (candidates.size() < numCandidates) {
|
||||
strategy.pick(realtimeServers, segment.get(), numCandidates - candidates.size())
|
||||
.stream()
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.forEach(candidates::add);
|
||||
}
|
||||
return candidates;
|
||||
} else {
|
||||
// return all servers as candidates
|
||||
return servers.values()
|
||||
.stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.collect(Collectors.toList());
|
||||
return getAllServers();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<DruidServerMetadata> getAllServers()
|
||||
{
|
||||
List<DruidServerMetadata> servers = new ArrayList<>();
|
||||
historicalServers.values()
|
||||
.stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.forEach(servers::add);
|
||||
|
||||
realtimeServers.values()
|
||||
.stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(server -> server.getServer().getMetadata())
|
||||
.forEach(servers::add);
|
||||
|
||||
return servers;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public QueryableDruidServer pick()
|
||||
{
|
||||
synchronized (this) {
|
||||
return strategy.pick(servers, segment.get());
|
||||
if (!historicalServers.isEmpty()) {
|
||||
return strategy.pick(historicalServers, segment.get());
|
||||
}
|
||||
return strategy.pick(realtimeServers, segment.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
|
||||
import io.druid.client.DirectDruidClient;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DirectClientQuerySegmentWalker implements QuerySegmentWalker
|
||||
{
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final DirectDruidClient baseClient;
|
||||
|
||||
public DirectClientQuerySegmentWalker(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
DirectDruidClient baseClient
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.baseClient = baseClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
return makeRunner(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
return makeRunner(query);
|
||||
}
|
||||
|
||||
private <T> FinalizeResultsQueryRunner<T> makeRunner(final Query<T> query)
|
||||
{
|
||||
return new FinalizeResultsQueryRunner<T>(baseClient, warehouse.getToolChest(query));
|
||||
}
|
||||
}
|
|
@ -130,8 +130,7 @@ public class BrokerServerViewTest extends CuratorTestBase
|
|||
Assert.assertTrue(actualPartitionHolder.isComplete());
|
||||
Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
|
||||
|
||||
ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
|
||||
.next()).getObject();
|
||||
ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject();
|
||||
Assert.assertFalse(selector.isEmpty());
|
||||
Assert.assertEquals(segment, selector.getSegment());
|
||||
Assert.assertEquals(druidServer, selector.pick().getServer());
|
||||
|
|
|
@ -153,7 +153,7 @@ public class DirectDruidClientTest
|
|||
);
|
||||
|
||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
|
||||
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
|
||||
client1
|
||||
);
|
||||
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.easymock.EasyMock.expect;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerSelectorTest
|
||||
|
@ -42,7 +44,7 @@ public class ServerSelectorTest
|
|||
public void setUp()
|
||||
{
|
||||
tierSelectorStrategy = EasyMock.createMock(TierSelectorStrategy.class);
|
||||
EasyMock.expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes();
|
||||
expect(tierSelectorStrategy.getComparator()).andReturn(Integer::compare).anyTimes();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -53,7 +55,7 @@ public class ServerSelectorTest
|
|||
.dataSource("test_broker_server_view")
|
||||
.interval(Intervals.of("2012/2013"))
|
||||
.loadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
ImmutableMap.of(
|
||||
"type",
|
||||
"local",
|
||||
"path",
|
||||
|
@ -61,13 +63,13 @@ public class ServerSelectorTest
|
|||
)
|
||||
)
|
||||
.version("v1")
|
||||
.dimensions(ImmutableList.<String>of())
|
||||
.metrics(ImmutableList.<String>of())
|
||||
.dimensions(ImmutableList.of())
|
||||
.metrics(ImmutableList.of())
|
||||
.shardSpec(NoneShardSpec.instance())
|
||||
.binaryVersion(9)
|
||||
.size(0)
|
||||
.build(),
|
||||
EasyMock.createMock(TierSelectorStrategy.class)
|
||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
||||
);
|
||||
|
||||
selector.addServerAndUpdateSegment(
|
||||
|
@ -80,7 +82,7 @@ public class ServerSelectorTest
|
|||
"test_broker_server_view")
|
||||
.interval(Intervals.of("2012/2013"))
|
||||
.loadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
ImmutableMap.of(
|
||||
"type",
|
||||
"local",
|
||||
"path",
|
||||
|
@ -89,13 +91,13 @@ public class ServerSelectorTest
|
|||
)
|
||||
.version("v1")
|
||||
.dimensions(
|
||||
ImmutableList.<String>of(
|
||||
ImmutableList.of(
|
||||
"a",
|
||||
"b",
|
||||
"c"
|
||||
))
|
||||
.metrics(
|
||||
ImmutableList.<String>of())
|
||||
ImmutableList.of())
|
||||
.shardSpec(NoneShardSpec.instance())
|
||||
.binaryVersion(9)
|
||||
.size(0)
|
||||
|
|
Loading…
Reference in New Issue