From 1bf253f6e618e9152ebb38d17781c299b54dfd3e Mon Sep 17 00:00:00 2001 From: Jisoo Kim Date: Thu, 9 Nov 2017 07:31:18 -0800 Subject: [PATCH] Fix Router race condition and use default broker service name for invalid priority (#5050) * use default brokerServiceName when priority is not valid * use AtomicInteger for NodesHolder.roundRobinIndex * revert inspectionProfiles change * adjust TieredBrokerHostSelectorTest * combine if statements and ensure index does not become negative * set next index with mod if overflows * fix codestyle * use nextIndex * extract the while loop to a method --- .../PriorityTieredBrokerSelectorStrategy.java | 15 ++----------- .../router/TieredBrokerHostSelector.java | 21 ++++++++++++++----- .../router/TieredBrokerHostSelectorTest.java | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index c4af26e0b10..283f08fcd0b 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -22,7 +22,6 @@ package io.druid.server.router; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; -import com.google.common.collect.Iterables; import io.druid.query.Query; import io.druid.query.QueryContexts; @@ -48,19 +47,9 @@ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelecto { final int priority = QueryContexts.getPriority(query); - if (priority < minPriority) { + if (priority < minPriority || priority > maxPriority) { return Optional.of( - Iterables.getLast( - tierConfig.getTierToBrokerMap().values(), - tierConfig.getDefaultBrokerServiceName() - ) - ); - } else if (priority >= maxPriority) { - return Optional.of( - Iterables.getFirst( - tierConfig.getTierToBrokerMap().values(), - tierConfig.getDefaultBrokerServiceName() - ) + tierConfig.getDefaultBrokerServiceName() ); } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index f82d8a3e4a0..f52151fa23d 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -277,7 +278,7 @@ public class TieredBrokerHostSelector private static class NodesHolder { - private int roundRobinIndex = 0; + private AtomicInteger roundRobinIndex = new AtomicInteger(-1); private Map nodesMap = new HashMap<>(); private ImmutableList nodes = ImmutableList.of(); @@ -312,11 +313,21 @@ public class TieredBrokerHostSelector return null; } - if (roundRobinIndex >= currNodes.size()) { - roundRobinIndex %= currNodes.size(); - } + return currNodes.get(getIndex(currNodes)); + } - return currNodes.get(roundRobinIndex++); + int getIndex(ImmutableList currNodes) + { + while (true) { + int index = roundRobinIndex.get(); + int nextIndex = index + 1; + if (nextIndex >= currNodes.size()) { + nextIndex = 0; + } + if (roundRobinIndex.compareAndSet(index, nextIndex)) { + return nextIndex; + } + } } } } diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 36f83f001aa..81e0d97d793 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -265,7 +265,7 @@ public class TieredBrokerHostSelectorTest .build() ).lhs; - Assert.assertEquals("coldBroker", brokerName); + Assert.assertEquals("hotBroker", brokerName); } @Test