Prevent coordinator from getting stuck if leadership changes during coordinator run (#14385)

Changes:
- Add a timeout of 1 minute to resultFuture.get() in `CostBalancerStrategy.chooseBestServer`.
1 minute is the typical time for a full coordinator run and is more than enough time for cost
computations of a single segment.
- Raise an alert if an exception is encountered while computing costs and if the executor has
not been shutdown. This is because a shutdown is intentional and does not require an alert.
This commit is contained in:
Kashif Faraz 2023-06-08 15:29:20 +05:30 committed by GitHub
parent 6a4cbab4b8
commit 12e8fa5c97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 10 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.math3.util.FastMath; import org.apache.commons.math3.util.FastMath;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -36,6 +37,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy public class CostBalancerStrategy implements BalancerStrategy
@ -226,7 +229,7 @@ public class CostBalancerStrategy implements BalancerStrategy
try { try {
// results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server // results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server
List<Pair<Double, ServerHolder>> results = resultsFuture.get(); List<Pair<Double, ServerHolder>> results = resultsFuture.get(1, TimeUnit.MINUTES);
return results.stream() return results.stream()
// Comparator.comapringDouble will order by lowest cost... // Comparator.comapringDouble will order by lowest cost...
// reverse it because we want to drop from the highest cost servers first // reverse it because we want to drop from the highest cost servers first
@ -235,7 +238,7 @@ public class CostBalancerStrategy implements BalancerStrategy
.iterator(); .iterator();
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); alertOnFailure(e, "pick drop server");
} }
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
@ -298,10 +301,7 @@ public class CostBalancerStrategy implements BalancerStrategy
log.info( log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
tier, tier, initialTotalCost, normalization, normalizedInitialCost
initialTotalCost,
normalization,
normalizedInitialCost
); );
} }
@ -373,7 +373,7 @@ public class CostBalancerStrategy implements BalancerStrategy
final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>(); final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
bestServers.add(bestServer); bestServers.add(bestServer);
try { try {
for (Pair<Double, ServerHolder> server : resultsFuture.get()) { for (Pair<Double, ServerHolder> server : resultsFuture.get(1, TimeUnit.MINUTES)) {
if (server.lhs <= bestServers.get(0).lhs) { if (server.lhs <= bestServers.get(0).lhs) {
if (server.lhs < bestServers.get(0).lhs) { if (server.lhs < bestServers.get(0).lhs) {
bestServers.clear(); bestServers.clear();
@ -390,9 +390,28 @@ public class CostBalancerStrategy implements BalancerStrategy
bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size())); bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); alertOnFailure(e, "choose best load server");
} }
return bestServer; return bestServer;
} }
private void alertOnFailure(Exception e, String action)
{
// Do not alert if the executor has been shutdown
if (exec.isShutdown()) {
log.noStackTrace().info("Balancer executor was terminated. Failing action [%s].", action);
return;
}
final boolean hasTimedOut = e instanceof TimeoutException;
final String message = StringUtils.format(
"Cost balancer strategy %s in action [%s].%s",
hasTimedOut ? "timed out" : "failed", action,
hasTimedOut ? " Try setting a higher value of 'balancerComputeThreads'." : ""
);
log.makeAlert(e, message).emit();
}
} }

View File

@ -23,6 +23,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -32,9 +36,11 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -44,6 +50,7 @@ public class CostBalancerStrategyTest
private static final double DELTA = 1e-6; private static final double DELTA = 1e-6;
private static final String DS_WIKI = "wiki"; private static final String DS_WIKI = "wiki";
private StubServiceEmitter serviceEmitter;
private ExecutorService balancerExecutor; private ExecutorService balancerExecutor;
private CostBalancerStrategy strategy; private CostBalancerStrategy strategy;
private int uniqueServerId; private int uniqueServerId;
@ -53,6 +60,9 @@ public class CostBalancerStrategyTest
{ {
balancerExecutor = new BlockingExecutorService("test-balance-exec-%d"); balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
strategy = new CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor)); strategy = new CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
serviceEmitter = new StubServiceEmitter("test-service", "host");
EmittingLogger.registerEmitter(serviceEmitter);
} }
@After @After
@ -302,6 +312,42 @@ public class CostBalancerStrategyTest
); );
} }
@Test
public void testFindServerAfterExecutorShutdownThrowsException()
{
DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.DAY)
.startingAt("2012-10-24")
.eachOfSizeInMb(100).get(0);
ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), null);
ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), null);
balancerExecutor.shutdownNow();
Assert.assertThrows(
RejectedExecutionException.class,
() -> strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA, serverB))
);
}
@Test(timeout = 90_000L)
public void testFindServerRaisesAlertOnTimeout()
{
DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.DAY)
.startingAt("2012-10-24")
.eachOfSizeInMb(100).get(0);
ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), null);
ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), null);
strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA, serverB));
List<Event> events = serviceEmitter.getEvents();
Assert.assertEquals(1, events.size());
Assert.assertTrue(events.get(0) instanceof AlertEvent);
}
private void verifyServerCosts( private void verifyServerCosts(
DataSegment segment, DataSegment segment,
List<ServerHolder> serverHolders, List<ServerHolder> serverHolders,

View File

@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -48,6 +49,8 @@ public class BlockingExecutorService implements ExecutorService
private final String nameFormat; private final String nameFormat;
private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>(); private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>();
private boolean isShutdown;
public BlockingExecutorService(String nameFormat) public BlockingExecutorService(String nameFormat)
{ {
this.nameFormat = nameFormat; this.nameFormat = nameFormat;
@ -115,12 +118,14 @@ public class BlockingExecutorService implements ExecutorService
@Override @Override
public <T> Future<T> submit(Callable<T> task) public <T> Future<T> submit(Callable<T> task)
{ {
verifyNotShutdown();
return addTaskToQueue(task); return addTaskToQueue(task);
} }
@Override @Override
public <T> Future<T> submit(Runnable task, T result) public <T> Future<T> submit(Runnable task, T result)
{ {
verifyNotShutdown();
return addTaskToQueue(() -> { return addTaskToQueue(() -> {
task.run(); task.run();
return result; return result;
@ -130,6 +135,7 @@ public class BlockingExecutorService implements ExecutorService
@Override @Override
public Future<?> submit(Runnable task) public Future<?> submit(Runnable task)
{ {
verifyNotShutdown();
return addTaskToQueue(() -> { return addTaskToQueue(() -> {
task.run(); task.run();
return null; return null;
@ -142,6 +148,13 @@ public class BlockingExecutorService implements ExecutorService
submit(command); submit(command);
} }
private void verifyNotShutdown()
{
if (isShutdown) {
throw new RejectedExecutionException();
}
}
private <T> Future<T> addTaskToQueue(Callable<T> callable) private <T> Future<T> addTaskToQueue(Callable<T> callable)
{ {
Task<T> task = new Task<>(callable); Task<T> task = new Task<>(callable);
@ -153,25 +166,27 @@ public class BlockingExecutorService implements ExecutorService
@Override @Override
public void shutdown() public void shutdown()
{ {
isShutdown = true;
taskQueue.clear(); taskQueue.clear();
} }
@Override @Override
public List<Runnable> shutdownNow() public List<Runnable> shutdownNow()
{ {
shutdown();
return null; return null;
} }
@Override @Override
public boolean isShutdown() public boolean isShutdown()
{ {
return false; return isShutdown;
} }
@Override @Override
public boolean isTerminated() public boolean isTerminated()
{ {
return false; return isShutdown && taskQueue.isEmpty();
} }
@Override @Override