recreate the balancer executor only when needed (#10280)

* recreate the balancer executor only when needed

* fix UT error

* shutdown the balancer executor in stopBeingLeader and stop

* remove commented code

* remove comments
This commit is contained in:
Arvin.Z 2020-09-16 12:25:57 -07:00 committed by GitHub
parent 94226f1b3d
commit 1b05d6e542
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 128 additions and 12 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@ -149,6 +150,9 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
@Inject
public DruidCoordinator(
DruidCoordinatorConfig config,
@ -483,6 +487,18 @@ public class DruidCoordinator
}
}
@VisibleForTesting
public int getCachedBalancerThreadNumber()
{
return cachedBalancerThreadNumber;
}
@VisibleForTesting
public ListeningExecutorService getBalancerExec()
{
return balancerExec;
}
@LifecycleStart
public void start()
{
@ -524,6 +540,10 @@ public class DruidCoordinator
started = false;
exec.shutdownNow();
if (balancerExec != null) {
balancerExec.shutdownNow();
}
}
}
@ -612,6 +632,11 @@ public class DruidCoordinator
lookupCoordinatorManager.stop();
metadataRuleManager.stop();
segmentsMetadataManager.stopPollingDatabasePeriodically();
if (balancerExec != null) {
balancerExec.shutdownNow();
balancerExec = null;
}
}
}
@ -647,22 +672,52 @@ public class DruidCoordinator
return ImmutableList.of(compactSegments);
}
private class DutiesRunnable implements Runnable
@VisibleForTesting
protected class DutiesRunnable implements Runnable
{
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final int startingLeaderCounter;
private DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;
}
@VisibleForTesting
protected void initBalancerExecutor()
{
final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();
final String threadNameFormat = "coordinator-cost-balancer-%s";
// fist time initialization
if (balancerExec == null) {
balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
currentNumber,
threadNameFormat
));
cachedBalancerThreadNumber = currentNumber;
return;
}
if (cachedBalancerThreadNumber != currentNumber) {
log.info(
"balancerComputeThreads has been changed from [%s] to [%s], recreating the thread pool.",
cachedBalancerThreadNumber,
currentNumber
);
balancerExec.shutdownNow();
balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
currentNumber,
threadNameFormat
));
cachedBalancerThreadNumber = currentNumber;
}
}
@Override
public void run()
{
ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
@ -684,10 +739,7 @@ public class DruidCoordinator
}
}
balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
getDynamicConfigs().getBalancerComputeThreads(),
"coordinator-cost-balancer-%s"
));
initBalancerExecutor();
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
@ -733,11 +785,6 @@ public class DruidCoordinator
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
}
finally {
if (balancerExec != null) {
balancerExec.shutdownNow();
}
}
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
@ -665,6 +666,74 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
@Test
public void testBalancerThreadNumber()
{
CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(dynamicConfig)).anyTimes();
ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
null,
null,
configManager,
null,
null,
null,
null,
null,
scheduledExecutorFactory,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0);
// before initialization
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
Assert.assertNull(c.getBalancerExec());
// first initialization
duty.initBalancerExecutor();
System.out.println("c.getCachedBalancerThreadNumber(): " + c.getCachedBalancerThreadNumber());
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);
// second initialization, expect no changes as cachedBalancerThreadNumber is not changed
duty.initBalancerExecutor();
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertTrue(firstExec == secondExec);
// third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10
duty.initBalancerExecutor();
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);
Assert.assertFalse(secondExec == thirdExec);
Assert.assertFalse(firstExec == thirdExec);
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,