fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140)

* fix issue with CuratorLoadQueuePeon shutting down executors it does not own

* use lifecycled executors

* maybe this
This commit is contained in:
Clint Wylie 2019-07-24 10:59:43 -07:00 committed by Jonathan Wei
parent 799d20249f
commit 0695e487e7
3 changed files with 27 additions and 9 deletions

View File

@ -335,8 +335,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
queuedSize.set(0L); queuedSize.set(0L);
failedAssignCount.set(0); failedAssignCount.set(0);
processingExecutor.shutdown();
callBackExecutor.shutdown();
} }
private void entryRemoved(SegmentHolder segmentHolder, String path) private void entryRemoved(SegmentHolder segmentHolder, String path)

View File

@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -119,6 +121,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig; private final ZkPathsConfig zkPathsConfig;
private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
public CuratorDruidCoordinatorTest() public CuratorDruidCoordinatorTest()
{ {
jsonMapper = TestHelper.makeJsonMapper(); jsonMapper = TestHelper.makeJsonMapper();
@ -187,16 +192,16 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
curator, curator,
SOURCE_LOAD_PATH, SOURCE_LOAD_PATH,
objectMapper, objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"), peonExec,
Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"), callbackExec,
druidCoordinatorConfig druidCoordinatorConfig
); );
destinationLoadQueuePeon = new CuratorLoadQueuePeon( destinationLoadQueuePeon = new CuratorLoadQueuePeon(
curator, curator,
DESTINATION_LOAD_PATH, DESTINATION_LOAD_PATH,
objectMapper, objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"), peonExec,
Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"), callbackExec,
druidCoordinatorConfig druidCoordinatorConfig
); );
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
@ -261,6 +266,15 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
@Rule @Rule
public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS); public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
@Test
public void testStopDoesntKillPoolItDoesntOwn() throws Exception
{
setupView();
sourceLoadQueuePeon.stop();
Assert.assertFalse(peonExec.isShutdown());
Assert.assertFalse(callbackExec.isShutdown());
}
@Test @Test
public void testMoveSegment() throws Exception public void testMoveSegment() throws Exception
{ {

View File

@ -47,7 +47,9 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule; import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServices;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
@ -248,7 +250,8 @@ public class CliCoordinator extends ServerRunnable
ScheduledExecutorFactory factory, ScheduledExecutorFactory factory,
DruidCoordinatorConfig config, DruidCoordinatorConfig config,
@EscalatedGlobal HttpClient httpClient, @EscalatedGlobal HttpClient httpClient,
ZkPathsConfig zkPaths ZkPathsConfig zkPaths,
Lifecycle lifecycle
) )
{ {
boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
@ -256,9 +259,12 @@ public class CliCoordinator extends ServerRunnable
if (useHttpLoadQueuePeon) { if (useHttpLoadQueuePeon) {
callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
} else { } else {
callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon" callBackExec = Execs.multiThreaded(
+ "-callbackexec--%d"); config.getNumCuratorCallBackThreads(),
"LoadQueuePeon-callbackexec--%d"
);
} }
ExecutorServices.manageLifecycle(lifecycle, callBackExec);
return new LoadQueueTaskMaster( return new LoadQueueTaskMaster(
curator, curator,
jsonMapper, jsonMapper,