From f8f4526b168568b770b0fd309d3bc2c59dfe4e27 Mon Sep 17 00:00:00 2001 From: QiuMM Date: Thu, 11 Oct 2018 12:41:59 +0800 Subject: [PATCH] Add suspend|resume|terminate all supervisors endpoints. (#6272) * ability to showdown all supervisors * add doc * address comments * fix code style * address comments * change ternary assignment to if statement * better docs --- .../extensions-core/kafka-ingestion.md | 24 +++++- .../supervisor/SupervisorManager.java | 53 ++++++++++-- .../supervisor/SupervisorResource.java | 68 ++++++++++++--- .../supervisor/SupervisorResourceTest.java | 85 ++++++++++++------- 4 files changed, 175 insertions(+), 55 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index ebc240a2d16..568fc94fe30 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -206,13 +206,27 @@ Suspend indexing tasks associated with a supervisor. Note that the supervisor it operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor is resumed. Responds with updated SupervisorSpec. -#### Resume Supervisor +#### Suspend All Supervisors + +``` +POST /druid/indexer/v1/supervisor/suspendAll +``` +Suspend all supervisors at once. + +#### Resume Supervisor ``` POST /druid/indexer/v1/supervisor//resume ``` Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec. +#### Resume All Supervisors + +``` +POST /druid/indexer/v1/supervisor/resumeAll +``` +Resume all supervisors at once. + #### Reset Supervisor ``` POST /druid/indexer/v1/supervisor//reset @@ -241,7 +255,13 @@ with the supervisor history api, but will not be listed in the 'get supervisors' or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor spec to the create api. -#### Shutdown Supervisor +#### Terminate All Supervisors +``` +POST /druid/indexer/v1/supervisor/terminateAll +``` +Terminate all supervisors at once. + +#### Shutdown Supervisor _Deprecated: use the equivalent 'terminate' instead_ ``` POST /druid/indexer/v1/supervisor//shutdown diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 22fd82746a7..cfca0de4b60 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -94,13 +94,31 @@ public class SupervisorManager public boolean suspendOrResumeSupervisor(String id, boolean suspend) { Preconditions.checkState(started, "SupervisorManager not started"); - Pair pair = supervisors.get(id); - Preconditions.checkNotNull(pair.rhs, "spec"); + Preconditions.checkNotNull(id, "id"); + synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); - SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); - possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); - return createAndStartSupervisorInternal(nextState, true); + return possiblySuspendOrResumeSupervisorInternal(id, suspend); + } + } + + public void stopAndRemoveAllSupervisors() + { + Preconditions.checkState(started, "SupervisorManager not started"); + + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + supervisors.keySet().forEach(id -> possiblyStopAndRemoveSupervisorInternal(id, true)); + } + } + + public void suspendOrResumeAllSupervisors(boolean suspend) + { + Preconditions.checkState(started, "SupervisorManager not started"); + + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + supervisors.keySet().forEach(id -> possiblySuspendOrResumeSupervisorInternal(id, suspend)); } } @@ -206,7 +224,7 @@ public class SupervisorManager * Stops a supervisor with a given id and then removes it from the list. *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be - * starting and stopping supervisors. + * starting, stopping, suspending and resuming supervisors. * * @return true if a supervisor was stopped, false if there was no supervisor with this id */ @@ -226,11 +244,32 @@ public class SupervisorManager return true; } + /** + * Suspend or resume a supervisor with a given id. + *

+ * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be + * starting, stopping, suspending and resuming supervisors. + * + * @return true if a supervisor was suspended or resumed, false if there was no supervisor with this id + * or suspend a suspended supervisor or resume a running supervisor + */ + private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean suspend) + { + Pair pair = supervisors.get(id); + if (pair == null || pair.rhs.isSuspended() == suspend) { + return false; + } + + SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); + possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); + return createAndStartSupervisorInternal(nextState, true); + } + /** * Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id. *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be - * starting and stopping supervisors. + * starting, stopping, suspending and resuming supervisors. * * @return true if a new supervisor was created, false if there was already an existing supervisor with this id */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index d3e19bbb4ab..97e0580376e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -260,6 +260,35 @@ public class SupervisorResource ); } + @POST + @Path("/suspendAll") + @Produces(MediaType.APPLICATION_JSON) + public Response suspendAll() + { + return suspendOrResumeAll(true); + } + + @POST + @Path("/resumeAll") + @Produces(MediaType.APPLICATION_JSON) + public Response resumeAll() + { + return suspendOrResumeAll(false); + } + + @POST + @Path("/terminateAll") + @Produces(MediaType.APPLICATION_JSON) + public Response terminateAll() + { + return asLeaderWithSupervisorManager( + manager -> { + manager.stopAndRemoveAllSupervisors(); + return Response.ok(ImmutableMap.of("status", "success")).build(); + } + ); + } + @GET @Path("/history") @Produces(MediaType.APPLICATION_JSON) @@ -378,23 +407,34 @@ public class SupervisorResource { return asLeaderWithSupervisorManager( manager -> { - Optional spec = manager.getSupervisorSpec(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - if (spec.get().isSuspended() == suspend) { - final String errMsg = - StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); - return Response.status(Response.Status.BAD_REQUEST) + if (manager.suspendOrResumeSupervisor(id, suspend)) { + Optional spec = manager.getSupervisorSpec(id); + return Response.ok(spec.get()).build(); + } else { + Optional spec = manager.getSupervisorSpec(id); + Response.Status status; + String errMsg; + if (spec.isPresent()) { + status = Response.Status.BAD_REQUEST; + errMsg = StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); + } else { + status = Response.Status.NOT_FOUND; + errMsg = StringUtils.format("[%s] does not exist", id); + } + return Response.status(status) .entity(ImmutableMap.of("error", errMsg)) .build(); } - manager.suspendOrResumeSupervisor(id, suspend); - spec = manager.getSupervisorSpec(id); - return Response.ok(spec.get()).build(); + } + ); + } + + private Response suspendOrResumeAll(boolean suspend) + { + return asLeaderWithSupervisorManager( + manager -> { + manager.suspendOrResumeAllSupervisors(suspend); + return Response.ok(ImmutableMap.of("status", "success")).build(); } ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 9d6eab33e1e..d893898c5a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -292,14 +292,6 @@ public class SupervisorResourceTest extends EasyMockSupport @Test public void testSpecSuspend() { - - TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { @Override public List getDataSources() @@ -309,11 +301,8 @@ public class SupervisorResourceTest extends EasyMockSupport }; EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(running)).times(1) - .andReturn(Optional.of(suspended)).times(1); EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true); - EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)); replayAll(); Response response = supervisorResource.specSuspend("my-id"); @@ -326,7 +315,8 @@ public class SupervisorResourceTest extends EasyMockSupport resetAll(); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce(); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(false); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)); replayAll(); response = supervisorResource.specSuspend("my-id"); @@ -336,18 +326,9 @@ public class SupervisorResourceTest extends EasyMockSupport Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity()); } - - @Test public void testSpecResume() { - TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { @Override public List getDataSources() @@ -357,11 +338,8 @@ public class SupervisorResourceTest extends EasyMockSupport }; EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(suspended)).times(1) - .andReturn(Optional.of(running)).times(1); EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true); - EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)); replayAll(); Response response = supervisorResource.specResume("my-id"); @@ -374,7 +352,8 @@ public class SupervisorResourceTest extends EasyMockSupport resetAll(); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce(); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(false); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)); replayAll(); response = supervisorResource.specResume("my-id"); @@ -385,19 +364,19 @@ public class SupervisorResourceTest extends EasyMockSupport } @Test - public void testShutdown() + public void testTerminate() { EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true); EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false); replayAll(); - Response response = supervisorResource.shutdown("my-id"); + Response response = supervisorResource.terminate("my-id"); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); - response = supervisorResource.shutdown("my-id-2"); + response = supervisorResource.terminate("my-id-2"); Assert.assertEquals(404, response.getStatus()); verifyAll(); @@ -407,12 +386,54 @@ public class SupervisorResourceTest extends EasyMockSupport EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.shutdown("my-id"); + response = supervisorResource.terminate("my-id"); verifyAll(); Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSuspendAll() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + supervisorManager.suspendOrResumeAllSupervisors(true); + EasyMock.expectLastCall(); + replayAll(); + + Response response = supervisorResource.suspendAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); + verifyAll(); + } + + @Test + public void testResumeAll() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + supervisorManager.suspendOrResumeAllSupervisors(false); + EasyMock.expectLastCall(); + replayAll(); + + Response response = supervisorResource.resumeAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); + verifyAll(); + } + + @Test + public void testTerminateAll() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + supervisorManager.stopAndRemoveAllSupervisors(); + EasyMock.expectLastCall(); + replayAll(); + + Response response = supervisorResource.terminateAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); + verifyAll(); + } + @Test public void testSpecGetAllHistory() { @@ -872,7 +893,7 @@ public class SupervisorResourceTest extends EasyMockSupport EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.shutdown("my-id"); + response = supervisorResource.terminate("my-id"); Assert.assertEquals(503, response.getStatus()); verifyAll();