mirror of https://github.com/apache/druid.git
Fix Router race condition and use default broker service name for invalid priority (#5050)
* use default brokerServiceName when priority is not valid * use AtomicInteger for NodesHolder.roundRobinIndex * revert inspectionProfiles change * adjust TieredBrokerHostSelectorTest * combine if statements and ensure index does not become negative * set next index with mod if overflows * fix codestyle * use nextIndex * extract the while loop to a method
This commit is contained in:
parent
a8dc056c09
commit
1bf253f6e6
|
@ -22,7 +22,6 @@ package io.druid.server.router;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryContexts;
|
import io.druid.query.QueryContexts;
|
||||||
|
|
||||||
|
@ -48,19 +47,9 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto
|
||||||
{
|
{
|
||||||
final int priority = QueryContexts.getPriority(query);
|
final int priority = QueryContexts.getPriority(query);
|
||||||
|
|
||||||
if (priority < minPriority) {
|
if (priority < minPriority || priority > maxPriority) {
|
||||||
return Optional.of(
|
return Optional.of(
|
||||||
Iterables.getLast(
|
tierConfig.getDefaultBrokerServiceName()
|
||||||
tierConfig.getTierToBrokerMap().values(),
|
|
||||||
tierConfig.getDefaultBrokerServiceName()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
} else if (priority >= maxPriority) {
|
|
||||||
return Optional.of(
|
|
||||||
Iterables.getFirst(
|
|
||||||
tierConfig.getTierToBrokerMap().values(),
|
|
||||||
tierConfig.getDefaultBrokerServiceName()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -277,7 +278,7 @@ public class TieredBrokerHostSelector<T>
|
||||||
|
|
||||||
private static class NodesHolder
|
private static class NodesHolder
|
||||||
{
|
{
|
||||||
private int roundRobinIndex = 0;
|
private AtomicInteger roundRobinIndex = new AtomicInteger(-1);
|
||||||
|
|
||||||
private Map<String, Server> nodesMap = new HashMap<>();
|
private Map<String, Server> nodesMap = new HashMap<>();
|
||||||
private ImmutableList<Server> nodes = ImmutableList.of();
|
private ImmutableList<Server> nodes = ImmutableList.of();
|
||||||
|
@ -312,11 +313,21 @@ public class TieredBrokerHostSelector<T>
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (roundRobinIndex >= currNodes.size()) {
|
return currNodes.get(getIndex(currNodes));
|
||||||
roundRobinIndex %= currNodes.size();
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return currNodes.get(roundRobinIndex++);
|
int getIndex(ImmutableList<Server> currNodes)
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
int index = roundRobinIndex.get();
|
||||||
|
int nextIndex = index + 1;
|
||||||
|
if (nextIndex >= currNodes.size()) {
|
||||||
|
nextIndex = 0;
|
||||||
|
}
|
||||||
|
if (roundRobinIndex.compareAndSet(index, nextIndex)) {
|
||||||
|
return nextIndex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -265,7 +265,7 @@ public class TieredBrokerHostSelectorTest
|
||||||
.build()
|
.build()
|
||||||
).lhs;
|
).lhs;
|
||||||
|
|
||||||
Assert.assertEquals("coldBroker", brokerName);
|
Assert.assertEquals("hotBroker", brokerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue