mirror of https://github.com/apache/druid.git
Supervisor list api with states and health (#7839)
* allow optionally listing all supervisors with their state and health * docs * add state to full * clean * casing * format * spelling
This commit is contained in:
parent
248e075e24
commit
3fbb0a5e00
|
@ -510,8 +510,22 @@ Returns a list of objects of the currently active supervisors.
|
|||
|Field|Type|Description|
|
||||
|---|---|---|
|
||||
|`id`|String|supervisor unique identifier|
|
||||
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|
||||
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|
||||
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|
||||
|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|
|
||||
|
||||
* `/druid/indexer/v1/supervisor?state=true`
|
||||
|
||||
Returns a list of objects of the currently active supervisors and their current state.
|
||||
|
||||
|Field|Type|Description|
|
||||
|---|---|---|
|
||||
|`id`|String|supervisor unique identifier|
|
||||
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|
||||
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|
||||
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|
||||
|
||||
* `/druid/indexer/v1/supervisor/<supervisorId>`
|
||||
|
||||
Returns the current spec for the supervisor with the provided ID.
|
||||
|
|
|
@ -240,6 +240,12 @@ public class MaterializedViewSupervisor implements Supervisor
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SupervisorStateManager.State getState()
|
||||
{
|
||||
return stateManager.getSupervisorState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean isHealthy()
|
||||
{
|
||||
|
|
|
@ -385,10 +385,4 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
|
|||
{
|
||||
return spec.getIoConfig();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean isHealthy()
|
||||
{
|
||||
return stateManager.isHealthy();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,10 +316,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean isHealthy()
|
||||
{
|
||||
return stateManager.isHealthy();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,12 @@ public class SupervisorManager
|
|||
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.rhs);
|
||||
}
|
||||
|
||||
public Optional<SupervisorStateManager.State> getSupervisorState(String id)
|
||||
{
|
||||
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
|
||||
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
|
||||
}
|
||||
|
||||
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
|
||||
{
|
||||
Preconditions.checkState(started, "SupervisorManager not started");
|
||||
|
|
|
@ -52,6 +52,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -118,6 +119,7 @@ public class SupervisorResource
|
|||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response specGetAll(
|
||||
@QueryParam("full") String full,
|
||||
@QueryParam("state") Boolean state,
|
||||
@Context final HttpServletRequest req
|
||||
)
|
||||
{
|
||||
|
@ -128,20 +130,36 @@ public class SupervisorResource
|
|||
manager,
|
||||
manager.getSupervisorIds()
|
||||
);
|
||||
final boolean includeFull = full != null;
|
||||
final boolean includeState = state != null && state;
|
||||
|
||||
if (full == null) {
|
||||
return Response.ok(authorizedSupervisorIds).build();
|
||||
} else {
|
||||
List<Map<String, ?>> all =
|
||||
authorizedSupervisorIds.stream()
|
||||
.map(x -> ImmutableMap.<String, Object>builder()
|
||||
.put("id", x)
|
||||
.put("spec", manager.getSupervisorSpec(x).get())
|
||||
.build()
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
return Response.ok(all).build();
|
||||
if (includeFull || includeState) {
|
||||
List<Map<String, ?>> allStates = authorizedSupervisorIds
|
||||
.stream()
|
||||
.map(x -> {
|
||||
Optional<SupervisorStateManager.State> theState =
|
||||
manager.getSupervisorState(x);
|
||||
ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
|
||||
theBuilder.put("id", x);
|
||||
if (theState.isPresent()) {
|
||||
theBuilder.put("state", theState.get().getBasicState());
|
||||
theBuilder.put("detailedState", theState.get());
|
||||
theBuilder.put("healthy", theState.get().isHealthy());
|
||||
}
|
||||
if (includeFull) {
|
||||
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
|
||||
if (theSpec.isPresent()) {
|
||||
theBuilder.put("spec", theSpec.get());
|
||||
}
|
||||
}
|
||||
return theBuilder.build();
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
return Response.ok(allStates).build();
|
||||
}
|
||||
|
||||
return Response.ok(authorizedSupervisorIds).build();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -807,6 +807,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
|||
return generateReport(true);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SupervisorStateManager.State getState()
|
||||
{
|
||||
return stateManager.getSupervisorState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean isHealthy()
|
||||
{
|
||||
return stateManager.isHealthy();
|
||||
}
|
||||
|
||||
private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(
|
||||
boolean includeOffsets
|
||||
)
|
||||
|
|
|
@ -166,7 +166,7 @@ public class SupervisorResourceTest extends EasyMockSupport
|
|||
EasyMock.expectLastCall().anyTimes();
|
||||
replayAll();
|
||||
|
||||
Response response = supervisorResource.specGetAll(null, request);
|
||||
Response response = supervisorResource.specGetAll(null, null, request);
|
||||
verifyAll();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
|
@ -176,7 +176,7 @@ public class SupervisorResourceTest extends EasyMockSupport
|
|||
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
|
||||
replayAll();
|
||||
|
||||
response = supervisorResource.specGetAll(null, request);
|
||||
response = supervisorResource.specGetAll(null, null, request);
|
||||
verifyAll();
|
||||
|
||||
Assert.assertEquals(503, response.getStatus());
|
||||
|
@ -205,11 +205,15 @@ public class SupervisorResourceTest extends EasyMockSupport
|
|||
return Collections.singletonList("datasource2");
|
||||
}
|
||||
};
|
||||
SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
|
||||
SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
|
||||
|
||||
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
|
||||
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
|
||||
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
|
||||
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
|
||||
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
|
||||
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
|
||||
|
@ -219,7 +223,7 @@ public class SupervisorResourceTest extends EasyMockSupport
|
|||
EasyMock.expectLastCall().anyTimes();
|
||||
replayAll();
|
||||
|
||||
Response response = supervisorResource.specGetAll("", request);
|
||||
Response response = supervisorResource.specGetAll("", null, request);
|
||||
verifyAll();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
|
@ -233,6 +237,70 @@ public class SupervisorResourceTest extends EasyMockSupport
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecGetState()
|
||||
{
|
||||
Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
|
||||
SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
|
||||
{
|
||||
|
||||
@Override
|
||||
public List<String> getDataSources()
|
||||
{
|
||||
return Collections.singletonList("datasource1");
|
||||
}
|
||||
};
|
||||
SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
|
||||
{
|
||||
|
||||
@Override
|
||||
public List<String> getDataSources()
|
||||
{
|
||||
return Collections.singletonList("datasource2");
|
||||
}
|
||||
};
|
||||
|
||||
SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
|
||||
SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
|
||||
|
||||
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
|
||||
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
|
||||
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(1);
|
||||
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(1);
|
||||
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
|
||||
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
|
||||
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
|
||||
new AuthenticationResult("druid", "druid", null, null)
|
||||
).atLeastOnce();
|
||||
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
replayAll();
|
||||
|
||||
Response response = supervisorResource.specGetAll(null, true, request);
|
||||
verifyAll();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
List<Map<String, Object>> states = (List<Map<String, Object>>) response.getEntity();
|
||||
Assert.assertTrue(
|
||||
states.stream()
|
||||
.allMatch(state -> {
|
||||
final String id = (String) state.get("id");
|
||||
if ("id1".equals(id)) {
|
||||
return state1.equals(state.get("state"))
|
||||
&& state1.equals(state.get("detailedState"))
|
||||
&& (Boolean) state.get("healthy") == state1.isHealthy();
|
||||
} else if ("id2".equals(id)) {
|
||||
return state2.equals(state.get("state"))
|
||||
&& state2.equals(state.get("detailedState"))
|
||||
&& (Boolean) state.get("healthy") == state2.isHealthy();
|
||||
}
|
||||
return false;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecGet()
|
||||
{
|
||||
|
|
|
@ -113,6 +113,12 @@ public class NoopSupervisorSpec implements SupervisorSpec
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SupervisorStateManager.State getState()
|
||||
{
|
||||
return SupervisorStateManager.BasicState.RUNNING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset(DataSourceMetadata dataSourceMetadata)
|
||||
{
|
||||
|
|
|
@ -39,6 +39,8 @@ public interface Supervisor
|
|||
|
||||
SupervisorReport getStatus();
|
||||
|
||||
SupervisorStateManager.State getState();
|
||||
|
||||
default Map<String, Map<String, Object>> getStats()
|
||||
{
|
||||
return ImmutableMap.of();
|
||||
|
|
Loading…
Reference in New Issue