diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index 8ff4f6a4a23..de6d3824f1c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.commons.math3.util.FastMath; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -36,6 +37,8 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy @@ -226,7 +229,7 @@ public class CostBalancerStrategy implements BalancerStrategy try { // results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server - List> results = resultsFuture.get(); + List> results = resultsFuture.get(1, TimeUnit.MINUTES); return results.stream() // Comparator.comapringDouble will order by lowest cost... // reverse it because we want to drop from the highest cost servers first @@ -235,7 +238,7 @@ public class CostBalancerStrategy implements BalancerStrategy .iterator(); } catch (Exception e) { - log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); + alertOnFailure(e, "pick drop server"); } return Collections.emptyIterator(); } @@ -298,10 +301,7 @@ public class CostBalancerStrategy implements BalancerStrategy log.info( "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", - tier, - initialTotalCost, - normalization, - normalizedInitialCost + tier, initialTotalCost, normalization, normalizedInitialCost ); } @@ -373,7 +373,7 @@ public class CostBalancerStrategy implements BalancerStrategy final List> bestServers = new ArrayList<>(); bestServers.add(bestServer); try { - for (Pair server : resultsFuture.get()) { + for (Pair server : resultsFuture.get(1, TimeUnit.MINUTES)) { if (server.lhs <= bestServers.get(0).lhs) { if (server.lhs < bestServers.get(0).lhs) { bestServers.clear(); @@ -390,9 +390,28 @@ public class CostBalancerStrategy implements BalancerStrategy bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size())); } catch (Exception e) { - log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); + alertOnFailure(e, "choose best load server"); } return bestServer; } + + private void alertOnFailure(Exception e, String action) + { + // Do not alert if the executor has been shutdown + if (exec.isShutdown()) { + log.noStackTrace().info("Balancer executor was terminated. Failing action [%s].", action); + return; + } + + final boolean hasTimedOut = e instanceof TimeoutException; + + final String message = StringUtils.format( + "Cost balancer strategy %s in action [%s].%s", + hasTimedOut ? "timed out" : "failed", action, + hasTimedOut ? " Try setting a higher value of 'balancerComputeThreads'." : "" + ); + log.makeAlert(e, message).emit(); + } + } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java index 1b6a0cfdc6b..57735477486 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java @@ -23,6 +23,10 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.timeline.DataSegment; @@ -32,9 +36,11 @@ import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -44,6 +50,7 @@ public class CostBalancerStrategyTest private static final double DELTA = 1e-6; private static final String DS_WIKI = "wiki"; + private StubServiceEmitter serviceEmitter; private ExecutorService balancerExecutor; private CostBalancerStrategy strategy; private int uniqueServerId; @@ -53,6 +60,9 @@ public class CostBalancerStrategyTest { balancerExecutor = new BlockingExecutorService("test-balance-exec-%d"); strategy = new CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor)); + + serviceEmitter = new StubServiceEmitter("test-service", "host"); + EmittingLogger.registerEmitter(serviceEmitter); } @After @@ -302,6 +312,42 @@ public class CostBalancerStrategyTest ); } + @Test + public void testFindServerAfterExecutorShutdownThrowsException() + { + DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI) + .forIntervals(1, Granularities.DAY) + .startingAt("2012-10-24") + .eachOfSizeInMb(100).get(0); + + ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), null); + ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), null); + + balancerExecutor.shutdownNow(); + Assert.assertThrows( + RejectedExecutionException.class, + () -> strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA, serverB)) + ); + } + + @Test(timeout = 90_000L) + public void testFindServerRaisesAlertOnTimeout() + { + DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI) + .forIntervals(1, Granularities.DAY) + .startingAt("2012-10-24") + .eachOfSizeInMb(100).get(0); + + ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), null); + ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), null); + + strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA, serverB)); + + List events = serviceEmitter.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertTrue(events.get(0) instanceof AlertEvent); + } + private void verifyServerCosts( DataSegment segment, List serverHolders, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java index fc59a6bd9d4..111d88bf550 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; /** @@ -48,6 +49,8 @@ public class BlockingExecutorService implements ExecutorService private final String nameFormat; private final Queue> taskQueue = new ConcurrentLinkedQueue<>(); + private boolean isShutdown; + public BlockingExecutorService(String nameFormat) { this.nameFormat = nameFormat; @@ -115,12 +118,14 @@ public class BlockingExecutorService implements ExecutorService @Override public Future submit(Callable task) { + verifyNotShutdown(); return addTaskToQueue(task); } @Override public Future submit(Runnable task, T result) { + verifyNotShutdown(); return addTaskToQueue(() -> { task.run(); return result; @@ -130,6 +135,7 @@ public class BlockingExecutorService implements ExecutorService @Override public Future submit(Runnable task) { + verifyNotShutdown(); return addTaskToQueue(() -> { task.run(); return null; @@ -142,6 +148,13 @@ public class BlockingExecutorService implements ExecutorService submit(command); } + private void verifyNotShutdown() + { + if (isShutdown) { + throw new RejectedExecutionException(); + } + } + private Future addTaskToQueue(Callable callable) { Task task = new Task<>(callable); @@ -153,25 +166,27 @@ public class BlockingExecutorService implements ExecutorService @Override public void shutdown() { + isShutdown = true; taskQueue.clear(); } @Override public List shutdownNow() { + shutdown(); return null; } @Override public boolean isShutdown() { - return false; + return isShutdown; } @Override public boolean isTerminated() { - return false; + return isShutdown && taskQueue.isEmpty(); } @Override