Add `sys.supervisors` table to system tables (#8547)

* Add supervisors table to SystemSchema

* Add docs

* fix checkstyle

* fix test

* fix CI

* Add comments

* Fix javadoc teamcity error

* comments

* fix links in docs

* fix links

* rename fullStatus query param to system and remove it from docs
This commit is contained in:
Surekha 2019-10-18 15:16:42 -07:00 committed by Jonathan Wei
parent d88075237a
commit 98f59ddd7e
19 changed files with 857 additions and 40 deletions

View File

@ -507,8 +507,8 @@ Returns a list of objects of the currently active supervisors.
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details), e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|
@ -519,9 +519,10 @@ Returns a list of objects of the currently active supervisors and their current
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state|
* `/druid/indexer/v1/supervisor/<supervisorId>`

View File

@ -810,6 +810,27 @@ For example, to retrieve tasks information filtered by status, use the query
SELECT * FROM sys.tasks WHERE status='FAILED';
```
#### SUPERVISORS table
The supervisors table provides information about supervisors.
|Column|Type|Notes|
|------|-----|-----|
|supervisor_id|STRING|Supervisor task identifier|
|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor|
|type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`|
|source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream|
|suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state|
|spec|STRING|JSON-serialized supervisor spec|
For example, to retrieve supervisor tasks information filtered by health status, use the query
```sql
SELECT * FROM sys.supervisors WHERE healthy=0;
```
Note that sys tables may not support all the Druid SQL Functions.
## Server configuration

View File

@ -62,6 +62,7 @@ import java.util.Set;
public class MaterializedViewSupervisorSpec implements SupervisorSpec
{
private static final String TASK_PREFIX = "index_materialized_view";
private static final String SUPERVISOR_TYPE = "materialized_view";
private final String baseDataSource;
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] aggregators;
@ -325,6 +326,20 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
return suspended;
}
@Override
@JsonProperty("type")
public String getType()
{
return SUPERVISOR_TYPE;
}
@Override
@JsonProperty("source")
public String getSource()
{
return getBaseDataSource();
}
@Override
public String getId()
{

View File

@ -40,6 +40,8 @@ import java.util.Map;
public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String TASK_TYPE = "kafka";
@JsonCreator
public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@ -103,6 +105,18 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
);
}
@Override
public String getType()
{
return TASK_TYPE;
}
@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getTopic() : null;
}
@Override
public Supervisor createSupervisor()
{

View File

@ -42,6 +42,7 @@ import java.util.Map;
public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String SUPERVISOR_TYPE = "kinesis";
private final AWSCredentialsConfig awsCredentialsConfig;
@JsonCreator
@ -132,6 +133,18 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
);
}
@Override
public String getType()
{
return SUPERVISOR_TYPE;
}
@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getStream() : null;
}
@Override
public String toString()
{

View File

@ -19,6 +19,8 @@
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -79,12 +81,14 @@ public class SupervisorResource
private final TaskMaster taskMaster;
private final AuthorizerMapper authorizerMapper;
private final ObjectMapper objectMapper;
@Inject
public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper)
public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper, ObjectMapper objectMapper)
{
this.taskMaster = taskMaster;
this.authorizerMapper = authorizerMapper;
this.objectMapper = objectMapper;
}
@POST
@ -120,6 +124,7 @@ public class SupervisorResource
public Response specGetAll(
@QueryParam("full") String full,
@QueryParam("state") Boolean state,
@QueryParam("system") String system,
@Context final HttpServletRequest req
)
{
@ -132,24 +137,44 @@ public class SupervisorResource
);
final boolean includeFull = full != null;
final boolean includeState = state != null && state;
final boolean includeSystem = system != null;
if (includeFull || includeState) {
List<Map<String, ?>> allStates = authorizedSupervisorIds
if (includeFull || includeState || includeSystem) {
List<SupervisorStatus> allStates = authorizedSupervisorIds
.stream()
.map(x -> {
Optional<SupervisorStateManager.State> theState =
manager.getSupervisorState(x);
ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
theBuilder.put("id", x);
SupervisorStatus.Builder theBuilder = new SupervisorStatus.Builder();
theBuilder.withId(x);
if (theState.isPresent()) {
theBuilder.put("state", theState.get().getBasicState());
theBuilder.put("detailedState", theState.get());
theBuilder.put("healthy", theState.get().isHealthy());
theBuilder.withState(theState.get().getBasicState().toString())
.withDetailedState(theState.get().toString())
.withHealthy(theState.get().isHealthy());
}
if (includeFull) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
theBuilder.put("spec", theSpec.get());
theBuilder.withSpec(manager.getSupervisorSpec(x).get());
}
}
if (includeSystem) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
try {
// serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system`
// which are outside the overlord process can deserialize the response and get a json
// payload of SupervisorSpec object when they don't have guice bindings for all the fields
// for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or
// `KinesisSupervisorSpec`
theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get()));
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
theBuilder.withType(manager.getSupervisorSpec(x).get().getType())
.withSource(manager.getSupervisorSpec(x).get().getSource())
.withSuspended(manager.getSupervisorSpec(x).get().isSuspended());
}
}
return theBuilder.build();

View File

@ -149,6 +149,18 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
{
return false;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
};
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
.andReturn(Optional.of(supervisorSpec))

View File

@ -410,6 +410,18 @@ public class SupervisorManagerTest extends EasyMockSupport
return suspended;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
@Override
public List<String> getDataSources()
{

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
@ -45,6 +46,7 @@ import org.junit.runner.RunWith;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -53,6 +55,7 @@ import java.util.Set;
@RunWith(EasyMockRunner.class)
public class SupervisorResourceTest extends EasyMockSupport
{
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final TestSupervisorSpec SPEC1 = new TestSupervisorSpec(
"id1",
null,
@ -100,7 +103,8 @@ public class SupervisorResourceTest extends EasyMockSupport
}
};
}
}
},
OBJECT_MAPPER
);
}
@ -160,7 +164,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expectLastCall().anyTimes();
replayAll();
Response response = supervisorResource.specGetAll(null, null, request);
Response response = supervisorResource.specGetAll(null, null, null, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
@ -170,7 +174,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
response = supervisorResource.specGetAll(null, null, request);
response = supervisorResource.specGetAll(null, null, null, request);
verifyAll();
Assert.assertEquals(503, response.getStatus());
@ -184,10 +188,10 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2);
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@ -197,20 +201,59 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expectLastCall().anyTimes();
replayAll();
Response response = supervisorResource.specGetAll("", null, request);
Response response = supervisorResource.specGetAll("", null, null, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
List<Map<String, Object>> specs = (List<Map<String, Object>>) response.getEntity();
List<SupervisorStatus> specs = (List<SupervisorStatus>) response.getEntity();
Assert.assertTrue(
specs.stream()
.allMatch(spec ->
("id1".equals(spec.get("id")) && SPEC1.equals(spec.get("spec"))) ||
("id2".equals(spec.get("id")) && SPEC2.equals(spec.get("spec")))
("id1".equals(spec.getId()) && SPEC1.equals(spec.getSpec())) ||
("id2".equals(spec.getId()) && SPEC2.equals(spec.getSpec()))
)
);
}
@Test
public void testSpecGetAllSystem()
{
SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes();
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
new AuthenticationResult("druid", "druid", null, null)
).atLeastOnce();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
replayAll();
Response response = supervisorResource.specGetAll(null, null, "", request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
List<SupervisorStatus> specs = (List<SupervisorStatus>) response.getEntity();
specs.sort(Comparator.comparing(SupervisorStatus::getId));
Assert.assertEquals(2, specs.size());
SupervisorStatus spec = specs.get(0);
Assert.assertEquals("id1", spec.getId());
Assert.assertEquals("RUNNING", spec.getState());
Assert.assertEquals("RUNNING", spec.getDetailedState());
Assert.assertEquals(true, spec.isHealthy());
Assert.assertEquals("{\"type\":\"SupervisorResourceTest$TestSupervisorSpec\"}", spec.getSpecString());
Assert.assertEquals("test", spec.getType());
Assert.assertEquals("dummy", spec.getSource());
Assert.assertEquals(false, spec.isSuspended());
}
@Test
public void testSpecGetState()
{
@ -232,23 +275,23 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expectLastCall().anyTimes();
replayAll();
Response response = supervisorResource.specGetAll(null, true, request);
Response response = supervisorResource.specGetAll(null, true, null, request);
verifyAll();
Assert.assertEquals(200, response.getStatus());
List<Map<String, Object>> states = (List<Map<String, Object>>) response.getEntity();
List<SupervisorStatus> states = (List<SupervisorStatus>) response.getEntity();
Assert.assertTrue(
states.stream()
.allMatch(state -> {
final String id = (String) state.get("id");
final String id = (String) state.getId();
if ("id1".equals(id)) {
return state1.equals(state.get("state"))
&& state1.equals(state.get("detailedState"))
&& (Boolean) state.get("healthy") == state1.isHealthy();
return state1.toString().equals(state.getState())
&& state1.toString().equals(state.getDetailedState())
&& (Boolean) state.isHealthy() == state1.isHealthy();
} else if ("id2".equals(id)) {
return state2.equals(state.get("state"))
&& state2.equals(state.get("detailedState"))
&& (Boolean) state.get("healthy") == state2.isHealthy();
return state2.toString().equals(state.getState())
&& state2.toString().equals(state.getDetailedState())
&& (Boolean) state.isHealthy() == state2.isHealthy();
}
return false;
})
@ -1137,6 +1180,18 @@ public class SupervisorResourceTest extends EasyMockSupport
return suspended;
}
@Override
public String getType()
{
return "test";
}
@Override
public String getSource()
{
return "dummy";
}
@Override
public boolean equals(Object o)
{

View File

@ -48,25 +48,36 @@ public class NoopSupervisorSpec implements SupervisorSpec
@JsonProperty("suspended")
private boolean suspended; //ignored
@JsonProperty("type")
private String type; //ignored
@JsonProperty("source")
private String source; //ignored
@VisibleForTesting
public NoopSupervisorSpec(
String id,
List<String> datasources
)
{
this(id, datasources, null);
this(id, datasources, null, null, null);
}
@JsonCreator
public NoopSupervisorSpec(
@JsonProperty("id") @Nullable String id,
@JsonProperty("dataSources") @Nullable List<String> datasources,
@JsonProperty("suspended") @Nullable Boolean suspended
@JsonProperty("suspended") @Nullable Boolean suspended,
@JsonProperty("type") @Nullable String type,
@JsonProperty("source") @Nullable String source
)
{
this.id = id;
this.datasources = datasources == null ? new ArrayList<>() : datasources;
this.suspended = false; // ignore
// these are ignored
this.suspended = false;
this.type = "noop";
this.source = "noop";
}
@Override
@ -92,6 +103,20 @@ public class NoopSupervisorSpec implements SupervisorSpec
return suspended;
}
@Override
@JsonProperty("type")
public String getType()
{
return type;
}
@Override
@JsonProperty("source")
public String getSource()
{
return source;
}
@Override
public Supervisor createSupervisor()
{

View File

@ -56,4 +56,20 @@ public interface SupervisorSpec
{
return false;
}
/**
* This API is only used for informational purposes in
* org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable
*
* @return supervisor type
*/
String getType();
/**
* This API is only used for informational purposes in
* org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable
*
* @return source like stream or topic name
*/
String getSource();
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;
import java.util.Objects;
/**
* This class contains the attributes of a supervisor which are returned by the API's in
* org.apache.druid.indexing.overlord.supervisor.SupervisorResource
* and used by org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable
*/
@JsonDeserialize(builder = SupervisorStatus.Builder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SupervisorStatus
{
private final String id;
private final String state;
private final String detailedState;
private final boolean healthy;
private final SupervisorSpec spec;
/**
* This is a JSON representation of {@code spec}
* The explicit serialization is present here so that users of {@code SupervisorStatus} which cannot
* deserialize {@link SupervisorSpec} can use this attribute instead
*/
private final String specString;
private final String type;
private final String source;
private final boolean suspended;
private SupervisorStatus(
Builder builder
)
{
this.id = Preconditions.checkNotNull(builder.id, "id");
this.state = builder.state;
this.detailedState = builder.detailedState;
this.healthy = builder.healthy;
this.spec = builder.spec;
this.specString = builder.specString;
this.type = builder.type;
this.source = builder.source;
this.suspended = builder.suspended;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SupervisorStatus that = (SupervisorStatus) o;
return healthy == that.healthy &&
Objects.equals(id, that.id) &&
Objects.equals(state, that.state) &&
Objects.equals(detailedState, that.detailedState) &&
Objects.equals(spec, that.spec) &&
Objects.equals(specString, that.specString) &&
Objects.equals(type, that.type) &&
Objects.equals(source, that.source) &&
suspended == that.suspended;
}
@Override
public int hashCode()
{
return Objects.hash(id, state, detailedState, healthy, spec, specString, type, source, suspended);
}
@JsonProperty
public String getId()
{
return id;
}
@JsonProperty
public String getState()
{
return state;
}
@JsonProperty
public String getDetailedState()
{
return detailedState;
}
@JsonProperty
public boolean isHealthy()
{
return healthy;
}
@JsonProperty
public SupervisorSpec getSpec()
{
return spec;
}
@JsonProperty
public String getSpecString()
{
return specString;
}
@JsonProperty
public String getType()
{
return type;
}
@JsonProperty
public String getSource()
{
return source;
}
@JsonProperty
public boolean isSuspended()
{
return suspended;
}
@JsonPOJOBuilder
public static class Builder
{
@JsonProperty("id")
private String id;
@JsonProperty("state")
private String state;
@JsonProperty("detailedState")
private String detailedState;
@JsonProperty("healthy")
private boolean healthy;
@JsonProperty("spec")
private SupervisorSpec spec;
@JsonProperty("specString")
private String specString;
@JsonProperty("type")
private String type;
@JsonProperty("source")
private String source;
@JsonProperty("suspended")
private boolean suspended;
@JsonProperty
public Builder withId(String id)
{
this.id = Preconditions.checkNotNull(id, "id");
return this;
}
@JsonProperty
public Builder withState(String state)
{
this.state = state;
return this;
}
@JsonProperty
public Builder withDetailedState(String detailedState)
{
this.detailedState = detailedState;
return this;
}
@JsonProperty
public Builder withHealthy(boolean healthy)
{
this.healthy = healthy;
return this;
}
@JsonProperty
public Builder withSpec(SupervisorSpec spec)
{
this.spec = spec;
return this;
}
@JsonProperty
public Builder withSpecString(String spec)
{
this.specString = spec;
return this;
}
@JsonProperty
public Builder withType(String type)
{
this.type = type;
return this;
}
@JsonProperty
public Builder withSource(String source)
{
this.source = source;
return this;
}
@JsonProperty
public Builder withSuspended(boolean suspended)
{
this.suspended = suspended;
return this;
}
public SupervisorStatus build()
{
return new SupervisorStatus(this);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class SupervisorStatusTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new DefaultObjectMapper();
final SupervisorStatus.Builder builder = new SupervisorStatus.Builder();
final SupervisorStatus supervisorStatus = builder.withId("wikipedia")
.withState("RUNNING")
.withDetailedState("RUNNING")
.withHealthy(true)
.withType("kafka")
.withSource("wikipedia")
.withSuspended(false)
.build();
final String serialized = mapper.writeValueAsString(supervisorStatus);
final SupervisorStatus deserialized = mapper.readValue(serialized, SupervisorStatus.class);
Assert.assertEquals(supervisorStatus, deserialized);
}
@Test
public void testJsonAttr() throws IOException
{
String json = "{"
+ "\"id\":\"wikipedia\","
+ "\"state\":\"UNHEALTHY_SUPERVISOR\","
+ "\"detailedState\":\"UNHEALTHY_SUPERVISOR\","
+ "\"healthy\":false,"
+ "\"type\":\"kafka\","
+ "\"source\":\"wikipedia\","
+ "\"suspended\":false"
+ "}";
final ObjectMapper mapper = new ObjectMapper();
final SupervisorStatus deserialized = mapper.readValue(json, SupervisorStatus.class);
Assert.assertNotNull(deserialized);
Assert.assertEquals("wikipedia", deserialized.getId());
final String serialized = mapper.writeValueAsString(deserialized);
Assert.assertTrue(serialized.contains("\"source\""));
Assert.assertEquals(json, serialized);
}
}

View File

@ -190,5 +190,18 @@ public class SQLMetadataSupervisorManagerTest
{
return Collections.singletonList(dataSource);
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
}
}

View File

@ -58,6 +58,18 @@ public class TestSupervisorSpec implements SupervisorSpec
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
@JsonProperty
public Object getData()
{

View File

@ -55,6 +55,7 @@ import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -99,6 +100,7 @@ public class SystemSchema extends AbstractSchema
private static final String SERVERS_TABLE = "servers";
private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks";
private static final String SUPERVISOR_TABLE = "supervisors";
private static final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>>
SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment ->
@ -180,6 +182,18 @@ public class SystemSchema extends AbstractSchema
.add("error_msg", ValueType.STRING)
.build();
static final RowSignature SUPERVISOR_SIGNATURE = RowSignature
.builder()
.add("supervisor_id", ValueType.STRING)
.add("state", ValueType.STRING)
.add("detailed_state", ValueType.STRING)
.add("healthy", ValueType.LONG)
.add("type", ValueType.STRING)
.add("source", ValueType.STRING)
.add("suspended", ValueType.LONG)
.add("spec", ValueType.STRING)
.build();
private final Map<String, Table> tableMap;
@Inject
@ -207,7 +221,8 @@ public class SystemSchema extends AbstractSchema
SEGMENTS_TABLE, segmentsTable,
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper),
SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
);
}
@ -729,7 +744,7 @@ public class SystemSchema extends AbstractSchema
try {
request = indexingServiceClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/indexer/v1/tasks"),
"/druid/indexer/v1/tasks",
false
);
}
@ -755,6 +770,170 @@ public class SystemSchema extends AbstractSchema
);
}
/**
* This table contains a row per supervisor task.
*/
static class SupervisorsTable extends AbstractTable implements ScannableTable
{
private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper;
public SupervisorsTable(
DruidLeaderClient druidLeaderClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper
)
{
this.druidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.authorizerMapper = authorizerMapper;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory)
{
return SUPERVISOR_SIGNATURE.getRelDataType(typeFactory);
}
@Override
public TableType getJdbcTableType()
{
return TableType.SYSTEM_TABLE;
}
@Override
public Enumerable<Object[]> scan(DataContext root)
{
class SupervisorsEnumerable extends DefaultEnumerable<Object[]>
{
private final CloseableIterator<SupervisorStatus> it;
public SupervisorsEnumerable(JsonParserIterator<SupervisorStatus> tasks)
{
this.it = getAuthorizedSupervisors(tasks, root);
}
@Override
public Iterator<Object[]> iterator()
{
throw new UnsupportedOperationException("Do not use iterator(), it cannot be closed.");
}
@Override
public Enumerator<Object[]> enumerator()
{
return new Enumerator<Object[]>()
{
@Override
public Object[] current()
{
final SupervisorStatus supervisor = it.next();
return new Object[]{
supervisor.getId(),
supervisor.getState(),
supervisor.getDetailedState(),
supervisor.isHealthy() ? 1L : 0L,
supervisor.getType(),
supervisor.getSource(),
supervisor.isSuspended() ? 1L : 0L,
supervisor.getSpecString()
};
}
@Override
public boolean moveNext()
{
return it.hasNext();
}
@Override
public void reset()
{
}
@Override
public void close()
{
try {
it.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
}
return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler));
}
private CloseableIterator<SupervisorStatus> getAuthorizedSupervisors(
JsonParserIterator<SupervisorStatus> it,
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator = supervisor -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
final Iterable<SupervisorStatus> authorizedSupervisors = AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
() -> it,
raGenerator,
authorizerMapper
);
return wrap(authorizedSupervisors.iterator(), it);
}
}
// Note that overlord must be up to get supervisor tasks, otherwise queries to sys.supervisors table
// will fail with internal server error (HTTP 500)
private static JsonParserIterator<SupervisorStatus> getSupervisors(
DruidLeaderClient indexingServiceClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler
)
{
Request request;
try {
request = indexingServiceClient.makeRequest(
HttpMethod.GET,
"/druid/indexer/v1/supervisor?system",
false
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = indexingServiceClient.goAsync(
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<SupervisorStatus>()
{
});
return new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
);
}
private static <T> CloseableIterator<T> wrap(Iterator<T> iterator, JsonParserIterator<T> it)
{
return new CloseableIterator<T>()

View File

@ -326,6 +326,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
.build()
);
@ -351,6 +352,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
.build()
);

View File

@ -453,10 +453,16 @@ public class SystemSchemaTest extends CalciteTestBase
@Test
public void testGetTableMap()
{
Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), schema.getTableNames());
Assert.assertEquals(
ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"),
schema.getTableNames()
);
final Map<String, Table> tableMap = schema.getTableMap();
Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), tableMap.keySet());
Assert.assertEquals(
ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"),
tableMap.keySet()
);
final SystemSchema.SegmentsTable segmentsTable = (SystemSchema.SegmentsTable) schema.getTableMap().get("segments");
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
@ -1113,6 +1119,90 @@ public class SystemSchemaTest extends CalciteTestBase
verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);
}
@Test
public void testSupervisorTable() throws Exception
{
SystemSchema.SupervisorsTable supervisorTable = EasyMock.createMockBuilder(SystemSchema.SupervisorsTable.class)
.withConstructor(client, mapper, responseHandler, authMapper)
.createMock();
EasyMock.replay(supervisorTable);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false))
.andReturn(request)
.anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(request.getUrl())
.andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system"))
.anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
String json = "[{\n"
+ "\t\"id\": \"wikipedia\",\n"
+ "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ "\t\"healthy\": false,\n"
+ "\t\"specString\": \"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ "\t\"type\": \"kafka\",\n"
+ "\t\"source\": \"wikipedia\",\n"
+ "\t\"suspended\": false\n"
+ "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite);
in.done();
future.set(in);
EasyMock.replay(client, request, responseHandler);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
Object[] row0 = rows.get(0);
Assert.assertEquals("wikipedia", row0[0].toString());
Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
Assert.assertEquals(0L, row0[3]);
Assert.assertEquals("kafka", row0[4].toString());
Assert.assertEquals("wikipedia", row0[5].toString());
Assert.assertEquals(0L, row0[6]);
Assert.assertEquals(
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
row0[7].toString()
);
// Verify value types.
verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
}
private static void verifyTypes(final List<Object[]> rows, final RowSignature signature)
{
final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl());

View File

@ -1390,6 +1390,7 @@ avg_num_rows
avg_size
created_time
current_size
detailed_state
druid.server.maxSize
druid.server.tier
druid.sql.planner.maxSemiJoinRowsInMemory
@ -1416,6 +1417,7 @@ runner_status
segment_id
server_type
sqlTimeZone
supervisor_id
sys
sys.segments
task_id