Add suspend|resume|terminate all supervisors endpoints. (#6272)

* ability to showdown all supervisors

* add doc

* address comments

* fix code style

* address comments

* change ternary assignment to if statement

* better docs
This commit is contained in:
QiuMM 2018-10-11 12:41:59 +08:00 committed by Jonathan Wei
parent f7775d1db3
commit f8f4526b16
4 changed files with 175 additions and 55 deletions

View File

@ -206,6 +206,13 @@ Suspend indexing tasks associated with a supervisor. Note that the supervisor it
operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor
is resumed. Responds with updated SupervisorSpec.
#### Suspend All Supervisors
```
POST /druid/indexer/v1/supervisor/suspendAll
```
Suspend all supervisors at once.
#### Resume Supervisor
```
@ -213,6 +220,13 @@ POST /druid/indexer/v1/supervisor/<supervisorId>/resume
```
Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
#### Resume All Supervisors
```
POST /druid/indexer/v1/supervisor/resumeAll
```
Resume all supervisors at once.
#### Reset Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/reset
@ -241,6 +255,12 @@ with the supervisor history api, but will not be listed in the 'get supervisors'
or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor
spec to the create api.
#### Terminate All Supervisors
```
POST /druid/indexer/v1/supervisor/terminateAll
```
Terminate all supervisors at once.
#### Shutdown Supervisor
_Deprecated: use the equivalent 'terminate' instead_
```

View File

@ -94,13 +94,31 @@ public class SupervisorManager
public boolean suspendOrResumeSupervisor(String id, boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
Preconditions.checkNotNull(pair.rhs, "spec");
Preconditions.checkNotNull(id, "id");
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec();
possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
return createAndStartSupervisorInternal(nextState, true);
return possiblySuspendOrResumeSupervisorInternal(id, suspend);
}
}
public void stopAndRemoveAllSupervisors()
{
Preconditions.checkState(started, "SupervisorManager not started");
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
supervisors.keySet().forEach(id -> possiblyStopAndRemoveSupervisorInternal(id, true));
}
}
public void suspendOrResumeAllSupervisors(boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
supervisors.keySet().forEach(id -> possiblySuspendOrResumeSupervisorInternal(id, suspend));
}
}
@ -206,7 +224,7 @@ public class SupervisorManager
* Stops a supervisor with a given id and then removes it from the list.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was stopped, false if there was no supervisor with this id
*/
@ -226,11 +244,32 @@ public class SupervisorManager
return true;
}
/**
* Suspend or resume a supervisor with a given id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was suspended or resumed, false if there was no supervisor with this id
* or suspend a suspended supervisor or resume a running supervisor
*/
private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean suspend)
{
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
if (pair == null || pair.rhs.isSuspended() == suspend) {
return false;
}
SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec();
possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
return createAndStartSupervisorInternal(nextState, true);
}
/**
* Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a new supervisor was created, false if there was already an existing supervisor with this id
*/

View File

@ -260,6 +260,35 @@ public class SupervisorResource
);
}
@POST
@Path("/suspendAll")
@Produces(MediaType.APPLICATION_JSON)
public Response suspendAll()
{
return suspendOrResumeAll(true);
}
@POST
@Path("/resumeAll")
@Produces(MediaType.APPLICATION_JSON)
public Response resumeAll()
{
return suspendOrResumeAll(false);
}
@POST
@Path("/terminateAll")
@Produces(MediaType.APPLICATION_JSON)
public Response terminateAll()
{
return asLeaderWithSupervisorManager(
manager -> {
manager.stopAndRemoveAllSupervisors();
return Response.ok(ImmutableMap.of("status", "success")).build();
}
);
}
@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
@ -378,23 +407,34 @@ public class SupervisorResource
{
return asLeaderWithSupervisorManager(
manager -> {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
if (!spec.isPresent()) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id)))
.build();
}
if (spec.get().isSuspended() == suspend) {
final String errMsg =
StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running");
return Response.status(Response.Status.BAD_REQUEST)
if (manager.suspendOrResumeSupervisor(id, suspend)) {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
return Response.ok(spec.get()).build();
} else {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
Response.Status status;
String errMsg;
if (spec.isPresent()) {
status = Response.Status.BAD_REQUEST;
errMsg = StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running");
} else {
status = Response.Status.NOT_FOUND;
errMsg = StringUtils.format("[%s] does not exist", id);
}
return Response.status(status)
.entity(ImmutableMap.of("error", errMsg))
.build();
}
manager.suspendOrResumeSupervisor(id, suspend);
spec = manager.getSupervisorSpec(id);
return Response.ok(spec.get()).build();
}
);
}
private Response suspendOrResumeAll(boolean suspend)
{
return asLeaderWithSupervisorManager(
manager -> {
manager.suspendOrResumeAllSupervisors(suspend);
return Response.ok(ImmutableMap.of("status", "success")).build();
}
);
}

View File

@ -292,14 +292,6 @@ public class SupervisorResourceTest extends EasyMockSupport
@Test
public void testSpecSuspend()
{
TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) {
@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};
TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) {
@Override
public List<String> getDataSources()
@ -309,11 +301,8 @@ public class SupervisorResourceTest extends EasyMockSupport
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
.andReturn(Optional.of(running)).times(1)
.andReturn(Optional.of(suspended)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
Response response = supervisorResource.specSuspend("my-id");
@ -326,7 +315,8 @@ public class SupervisorResourceTest extends EasyMockSupport
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(false);
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
response = supervisorResource.specSuspend("my-id");
@ -336,18 +326,9 @@ public class SupervisorResourceTest extends EasyMockSupport
Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity());
}
@Test
public void testSpecResume()
{
TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) {
@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};
TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) {
@Override
public List<String> getDataSources()
@ -357,11 +338,8 @@ public class SupervisorResourceTest extends EasyMockSupport
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
.andReturn(Optional.of(suspended)).times(1)
.andReturn(Optional.of(running)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
Response response = supervisorResource.specResume("my-id");
@ -374,7 +352,8 @@ public class SupervisorResourceTest extends EasyMockSupport
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(false);
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
response = supervisorResource.specResume("my-id");
@ -385,19 +364,19 @@ public class SupervisorResourceTest extends EasyMockSupport
}
@Test
public void testShutdown()
public void testTerminate()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false);
replayAll();
Response response = supervisorResource.shutdown("my-id");
Response response = supervisorResource.terminate("my-id");
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
response = supervisorResource.shutdown("my-id-2");
response = supervisorResource.terminate("my-id-2");
Assert.assertEquals(404, response.getStatus());
verifyAll();
@ -407,12 +386,54 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
response = supervisorResource.shutdown("my-id");
response = supervisorResource.terminate("my-id");
verifyAll();
Assert.assertEquals(503, response.getStatus());
}
@Test
public void testSuspendAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.suspendOrResumeAllSupervisors(true);
EasyMock.expectLastCall();
replayAll();
Response response = supervisorResource.suspendAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}
@Test
public void testResumeAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.suspendOrResumeAllSupervisors(false);
EasyMock.expectLastCall();
replayAll();
Response response = supervisorResource.resumeAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}
@Test
public void testTerminateAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.stopAndRemoveAllSupervisors();
EasyMock.expectLastCall();
replayAll();
Response response = supervisorResource.terminateAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}
@Test
public void testSpecGetAllHistory()
{
@ -872,7 +893,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
response = supervisorResource.shutdown("my-id");
response = supervisorResource.terminate("my-id");
Assert.assertEquals(503, response.getStatus());
verifyAll();