mirror of https://github.com/apache/druid.git
Fix race in CachingCostBalancerStrategyFactory (#4989)
* Fix race in CachingCostBalancerStrategyFactory * Remote timeout
This commit is contained in:
parent
d7024f22e1
commit
772ca783cd
|
@ -21,14 +21,11 @@ package io.druid.server.coordinator;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.client.ServerInventoryView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.java.util.common.concurrent.Execs;
|
||||
import io.druid.concurrent.LifecycleLock;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
|
@ -41,19 +38,21 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class CachingCostBalancerStrategyFactory implements BalancerStrategyFactory
|
||||
{
|
||||
private static final EmittingLogger LOG = new EmittingLogger(CachingCostBalancerStrategyFactory.class);
|
||||
|
||||
private final ServerInventoryView serverInventoryView;
|
||||
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
/** Must be single-threaded, because {@link ClusterCostCache.Builder} and downstream builders are not thread-safe */
|
||||
private final ExecutorService executor = Execs.singleThreaded("CachingCostBalancerStrategy-executor");
|
||||
private final ClusterCostCache.Builder clusterCostCacheBuilder = ClusterCostCache.builder();
|
||||
private volatile boolean initialized = false;
|
||||
/**
|
||||
* Atomic is needed to use compareAndSet(true, true) construction below, that is linearizable with the write made from
|
||||
* callback, that ensures visibility of the write made from callback. Neither plain field nor volatile field read
|
||||
* ensure such visibility
|
||||
*/
|
||||
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
|
||||
@JsonCreator
|
||||
public CachingCostBalancerStrategyFactory(
|
||||
|
@ -61,85 +60,71 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
|
|||
@JacksonInject Lifecycle lifecycle
|
||||
) throws Exception
|
||||
{
|
||||
this.serverInventoryView = Preconditions.checkNotNull(serverInventoryView);
|
||||
// Adding to lifecycle dynamically because couldn't use @ManageLifecycle on the class,
|
||||
// see https://github.com/druid-io/druid/issues/4980
|
||||
lifecycle.addMaybeStartManagedInstance(this);
|
||||
|
||||
serverInventoryView.registerSegmentCallback(
|
||||
executor,
|
||||
new ServerView.SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
clusterCostCacheBuilder.addSegment(server.getName(), segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
clusterCostCacheBuilder.removeSegment(server.getName(), segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentViewInitialized()
|
||||
{
|
||||
initialized.set(true);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
serverInventoryView.registerServerRemovedCallback(
|
||||
executor,
|
||||
server -> {
|
||||
clusterCostCacheBuilder.removeServer(server.getName());
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
if (!lifecycleLock.canStart()) {
|
||||
throw new ISE("CachingCostBalancerStrategyFactory can not be started");
|
||||
}
|
||||
try {
|
||||
serverInventoryView.registerSegmentCallback(
|
||||
executor,
|
||||
new ServerView.SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
clusterCostCacheBuilder.addSegment(server.getName(), segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
clusterCostCacheBuilder.removeSegment(server.getName(), segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentViewInitialized()
|
||||
{
|
||||
initialized = true;
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
serverInventoryView.registerServerRemovedCallback(
|
||||
executor,
|
||||
server -> {
|
||||
clusterCostCacheBuilder.removeServer(server.getName());
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
);
|
||||
|
||||
lifecycleLock.started();
|
||||
}
|
||||
finally {
|
||||
lifecycleLock.exitStart();
|
||||
}
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
if (!lifecycleLock.canStop()) {
|
||||
throw new ISE("CachingCostBalancerStrategyFactory can not be stopped");
|
||||
}
|
||||
executor.shutdownNow();
|
||||
// Not calling lifecycleLock.exitStop() because CachingCostBalancerStrategyFactory is not recycleable
|
||||
}
|
||||
|
||||
@Override
|
||||
public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec)
|
||||
{
|
||||
if (!lifecycleLock.awaitStarted(1, TimeUnit.MINUTES)) {
|
||||
throw new ISE("CachingCostBalancerStrategyFactory is not started");
|
||||
}
|
||||
if (initialized) {
|
||||
if (initialized.compareAndSet(true, true)) {
|
||||
try {
|
||||
// Calling clusterCostCacheBuilder.build() in the same thread (executor's sole thread) where
|
||||
// clusterCostCacheBuilder is updated, to avoid problems with concurrent updates
|
||||
CompletableFuture<CachingCostBalancerStrategy> future = CompletableFuture.supplyAsync(
|
||||
() -> new CachingCostBalancerStrategy(clusterCostCacheBuilder.build(), exec),
|
||||
executor
|
||||
);
|
||||
try {
|
||||
return future.get(1, TimeUnit.SECONDS);
|
||||
return future.get();
|
||||
}
|
||||
catch (CancellationException e) {
|
||||
LOG.error("CachingCostBalancerStrategy creation has been cancelled");
|
||||
|
@ -147,9 +132,6 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
|
|||
catch (ExecutionException e) {
|
||||
LOG.error(e, "Failed to create CachingCostBalancerStrategy");
|
||||
}
|
||||
catch (TimeoutException e) {
|
||||
LOG.error("CachingCostBalancerStrategy creation took more than 1 second!");
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
LOG.error("CachingCostBalancerStrategy creation has been interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
Loading…
Reference in New Issue