mirror of https://github.com/apache/druid.git
add custom tier selector
This commit is contained in:
parent
9de170540b
commit
e90eb4b83a
|
@ -22,6 +22,7 @@ package io.druid.client.selector;
|
|||
import com.metamx.common.ISE;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
@ -37,14 +38,12 @@ public abstract class AbstractTierSelectorStrategy implements TierSelectorStrate
|
|||
this.serverSelectorStrategy = serverSelectorStrategy;
|
||||
}
|
||||
|
||||
public abstract Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers);
|
||||
|
||||
@Override
|
||||
public QueryableDruidServer pick(
|
||||
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
|
||||
)
|
||||
{
|
||||
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = getServers(prioritizedServers);
|
||||
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = prioritizedServers.pollFirstEntry();
|
||||
|
||||
if (priorityServers == null) {
|
||||
return null;
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package io.druid.client.selector;
|
||||
|
||||
import com.google.api.client.util.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CustomTierSelectorStrategy extends AbstractTierSelectorStrategy
|
||||
{
|
||||
private final Comparator<Integer> comparator;
|
||||
|
||||
@Inject
|
||||
public CustomTierSelectorStrategy(
|
||||
ServerSelectorStrategy serverSelectorStrategy,
|
||||
CustomTierSelectorStrategyConfig config
|
||||
)
|
||||
{
|
||||
super(serverSelectorStrategy);
|
||||
|
||||
final Map<Integer, Integer> lookup = Maps.newHashMap();
|
||||
int pos = 0;
|
||||
for (Integer integer : config.getPriorities()) {
|
||||
lookup.put(integer, pos);
|
||||
pos++;
|
||||
}
|
||||
|
||||
this.comparator = new Comparator<Integer>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Integer o1, Integer o2)
|
||||
{
|
||||
int pos1 = lookup.get(o1);
|
||||
int pos2 = lookup.get(o2);
|
||||
return Ints.compare(pos1, pos2);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Integer> getComparator()
|
||||
{
|
||||
return comparator;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.client.util.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CustomTierSelectorStrategyConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private List<Integer> priorities = Lists.newArrayList();
|
||||
|
||||
public List<Integer> getPriorities()
|
||||
{
|
||||
return priorities;
|
||||
}
|
||||
}
|
|
@ -19,16 +19,24 @@
|
|||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HighestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
|
||||
{
|
||||
private static final Comparator<Integer> comparator = new Comparator<Integer>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Integer o1, Integer o2)
|
||||
{
|
||||
return -Ints.compare(o1, o2);
|
||||
}
|
||||
};
|
||||
|
||||
@Inject
|
||||
public HighestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
|
||||
{
|
||||
|
@ -36,8 +44,8 @@ public class HighestPriorityTierSelectorStrategy extends AbstractTierSelectorStr
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers)
|
||||
public Comparator<Integer> getComparator()
|
||||
{
|
||||
return prioritizedServers.pollLastEntry();
|
||||
return comparator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,16 +19,24 @@
|
|||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LowestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
|
||||
{
|
||||
private static final Comparator<Integer> comparator = new Comparator<Integer>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Integer o1, Integer o2)
|
||||
{
|
||||
return Ints.compare(o1, o2);
|
||||
}
|
||||
};
|
||||
|
||||
@Inject
|
||||
public LowestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
|
||||
{
|
||||
|
@ -36,8 +44,8 @@ public class LowestPriorityTierSelectorStrategy extends AbstractTierSelectorStra
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers)
|
||||
public Comparator<Integer> getComparator()
|
||||
{
|
||||
return prioritizedServers.pollFirstEntry();
|
||||
return comparator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
public QueryableDruidServer pick()
|
||||
{
|
||||
synchronized (this) {
|
||||
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = Maps.newTreeMap();
|
||||
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
|
||||
for (QueryableDruidServer server : servers) {
|
||||
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
|
||||
if (theServers == null) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -31,9 +32,12 @@ import java.util.TreeMap;
|
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HighestPriorityTierSelectorStrategy.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "highestPriority", value = HighestPriorityTierSelectorStrategy.class),
|
||||
@JsonSubTypes.Type(name = "lowestPriority", value = LowestPriorityTierSelectorStrategy.class)
|
||||
@JsonSubTypes.Type(name = "lowestPriority", value = LowestPriorityTierSelectorStrategy.class),
|
||||
@JsonSubTypes.Type(name = "custom", value = CustomTierSelectorStrategy.class)
|
||||
})
|
||||
public interface TierSelectorStrategy
|
||||
{
|
||||
public Comparator<Integer> getComparator();
|
||||
|
||||
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.client.cache.Cache;
|
|||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.CacheMonitor;
|
||||
import io.druid.client.cache.CacheProvider;
|
||||
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
|
||||
import io.druid.client.selector.ServerSelectorStrategy;
|
||||
import io.druid.client.selector.TierSelectorStrategy;
|
||||
import io.druid.curator.discovery.DiscoveryModule;
|
||||
|
@ -85,6 +86,7 @@ public class CliBroker extends ServerRunnable
|
|||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.tier", TierSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.server", ServerSelectorStrategy.class);
|
||||
|
||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
|
Loading…
Reference in New Issue