Show all server types in sys.servers table (#7654)

* update sys.servers table to show all servers

* update docs

* Fix integration test

* modify test query for batch integration test

* fix case in test queries

* make the server_type lowercase

* Apply suggestions from code review

Co-Authored-By: Himanshu <g.himanshu@gmail.com>

* Fix compilation from git suggestion

* fix unit test
This commit is contained in:
Surekha 2019-05-15 16:54:02 -07:00 committed by Himanshu
parent 0b739a5787
commit d3545f5086
7 changed files with 310 additions and 39 deletions

View File

@ -638,7 +638,7 @@ ORDER BY 2 DESC
```
### SERVERS table
Servers table lists all data servers(any server that hosts a segment). It includes both Historicals and Peons.
Servers table lists all discovered servers in the cluster.
|Column|Type|Notes|
|------|-----|-----|
@ -646,10 +646,10 @@ Servers table lists all data servers(any server that hosts a segment). It includ
|host|STRING|Hostname of the server|
|plaintext_port|LONG|Unsecured port of the server, or -1 if plaintext traffic is disabled|
|tls_port|LONG|TLS port of the server, or -1 if TLS is disabled|
|server_type|STRING|Type of Druid service. Possible values include: Historical, realtime and indexer_executor(Peon).|
|tier|STRING|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration)|
|current_size|LONG|Current size of segments in bytes on this server|
|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration)|
|server_type|STRING|Type of Druid service. Possible values include: COORDINATOR, OVERLORD, BROKER, ROUTER, HISTORICAL, MIDDLE_MANAGER or PEON.|
|tier|STRING|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration). Only valid for HISTORICAL type, for other types it's null|
|current_size|LONG|Current size of segments in bytes on this server. Only valid for HISTORICAL type, for other types it's 0|
|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration). Only valid for HISTORICAL type, for other types it's 0|
To retrieve information about all servers, use the query:

View File

@ -96,7 +96,7 @@ public class ITBasicAuthConfigurationTest
"SELECT * FROM sys.segments WHERE datasource IN ('auth_test')";
private static final String SYS_SCHEMA_SERVERS_QUERY =
"SELECT * FROM sys.servers";
"SELECT * FROM sys.servers WHERE tier IS NOT NULL";
private static final String SYS_SCHEMA_SERVER_SEGMENTS_QUERY =
"SELECT * FROM sys.server_segments WHERE segment_id LIKE 'auth_test%'";

View File

@ -11,11 +11,11 @@
},
{
"query": {
"query": "SELECT server_type FROM sys.servers"
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL"
},
"expectedResults": [
{
"server_type": "historical"
"server_type":"historical"
}
]
},

View File

@ -16,11 +16,11 @@
},
{
"query": {
"query": "SELECT server_type FROM sys.servers"
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL"
},
"expectedResults": [
{
"server_type": "historical"
"server_type":"historical"
}
]
}

View File

@ -48,13 +48,17 @@ import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
@ -75,6 +79,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -83,6 +88,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class SystemSchema extends AbstractSchema
{
@ -114,6 +120,10 @@ public class SystemSchema extends AbstractSchema
private static final long IS_OVERSHADOWED_FALSE = 0L;
private static final long IS_OVERSHADOWED_TRUE = 1L;
//defaults for SERVERS table
private static final long MAX_SERVER_SIZE = 0L;
private static final long CURRENT_SERVER_SIZE = 0L;
static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("segment_id", ValueType.STRING)
@ -177,6 +187,7 @@ public class SystemSchema extends AbstractSchema
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
final @IndexingService DruidLeaderClient overlordDruidLeaderClient,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final ObjectMapper jsonMapper
)
{
@ -190,7 +201,7 @@ public class SystemSchema extends AbstractSchema
);
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, segmentsTable,
SERVERS_TABLE, new ServersTable(serverView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
);
@ -423,18 +434,21 @@ public class SystemSchema extends AbstractSchema
}
/**
* This table contains row per server. At this time it only contains the
* data servers (i.e. historicals and peons)
* This table contains row per server. It contains all the discovered servers in druid cluster.
* Some columns like tier and size are only applicable to historical nodes which contain segments.
*/
static class ServersTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
private final AuthorizerMapper authorizerMapper;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
public ServersTable(TimelineServerView serverView, AuthorizerMapper authorizerMapper)
public ServersTable(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
AuthorizerMapper authorizerMapper
)
{
this.serverView = serverView;
this.authorizerMapper = authorizerMapper;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
}
@Override
@ -452,26 +466,41 @@ public class SystemSchema extends AbstractSchema
@Override
public Enumerable<Object[]> scan(DataContext root)
{
final List<ImmutableDruidServer> druidServers = serverView.getDruidServers();
final Iterator<DiscoveryDruidNode> druidServers = getDruidServers(druidNodeDiscoveryProvider);
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
checkStateReadAccessForServers(authenticationResult, authorizerMapper);
final FluentIterable<Object[]> results = FluentIterable
.from(druidServers)
.transform(val -> new Object[]{
val.getHost(),
extractHost(val.getHost()),
(long) extractPort(val.getHostAndPort()),
(long) extractPort(val.getHostAndTlsPort()),
toStringOrNull(val.getType()),
val.getTier(),
val.getCurrSize(),
val.getMaxSize()
.from(() -> druidServers)
.transform(val -> {
boolean isDataNode = false;
final DruidNode node = val.getDruidNode();
if (val.getNodeType().equals(NodeType.HISTORICAL)) {
isDataNode = true;
}
return new Object[]{
node.getHostAndPortToUse(),
extractHost(node.getHost()),
(long) extractPort(node.getHostAndPort()),
(long) extractPort(node.getHostAndTlsPort()),
StringUtils.toLowerCase(toStringOrNull(val.getNodeType())),
isDataNode ? val.toDruidServer().getTier() : null,
isDataNode ? val.toDruidServer().getCurrSize() : CURRENT_SERVER_SIZE,
isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE
};
});
return Linq4j.asEnumerable(results);
}
private Iterator<DiscoveryDruidNode> getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider)
{
return Arrays.stream(NodeType.values())
.flatMap(nodeType -> druidNodeDiscoveryProvider.getForNodeType(nodeType).getAllNodes().stream())
.collect(Collectors.toList())
.iterator();
}
}
/**

View File

@ -39,7 +39,12 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@ -60,6 +65,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@ -137,6 +143,7 @@ public class SystemSchemaTest extends CalciteTestBase
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private MetadataSegmentView metadataView;
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -237,6 +244,7 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema.start();
druidSchema.awaitInitialization();
metadataView = EasyMock.createMock(MetadataSegmentView.class);
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
schema = new SystemSchema(
druidSchema,
metadataView,
@ -244,6 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.createStrictMock(AuthorizerMapper.class),
client,
client,
druidNodeDiscoveryProvider,
mapper
);
}
@ -349,6 +358,75 @@ public class SystemSchemaTest extends CalciteTestBase
final List<DataSegment> realtimeSegments = ImmutableList.of(segment2, segment4, segment5);
private final DiscoveryDruidNode coordinator = new DiscoveryDruidNode(
new DruidNode("s1", "localhost", false, 8081, null, true, false),
NodeType.COORDINATOR,
ImmutableMap.of()
);
private final DiscoveryDruidNode overlord = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8090, null, true, false),
NodeType.OVERLORD,
ImmutableMap.of()
);
private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode(
new DruidNode("s3", "localhost", false, 8082, null, true, false),
NodeType.BROKER,
ImmutableMap.of()
);
private final DiscoveryDruidNode broker2 = new DiscoveryDruidNode(
new DruidNode("s3", "brokerHost", false, 8082, null, true, false),
NodeType.BROKER,
ImmutableMap.of()
);
private final DiscoveryDruidNode router = new DiscoveryDruidNode(
new DruidNode("s4", "localhost", false, 8888, null, true, false),
NodeType.ROUTER,
ImmutableMap.of()
);
private final DiscoveryDruidNode historical1 = new DiscoveryDruidNode(
new DruidNode("s5", "localhost", false, 8083, null, true, false),
NodeType.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
);
private final DiscoveryDruidNode historical2 = new DiscoveryDruidNode(
new DruidNode("s5", "histHost", false, 8083, null, true, false),
NodeType.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
private final DiscoveryDruidNode middleManager = new DiscoveryDruidNode(
new DruidNode("s6", "mmHost", false, 8091, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
);
private final DiscoveryDruidNode peon1 = new DiscoveryDruidNode(
new DruidNode("s7", "localhost", false, 8080, null, true, false),
NodeType.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
);
private final DiscoveryDruidNode peon2 = new DiscoveryDruidNode(
new DruidNode("s7", "peonHost", false, 8080, null, true, false),
NodeType.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L,
@ -601,14 +679,52 @@ public class SystemSchemaTest extends CalciteTestBase
{
SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class)
.withConstructor(serverView, authMapper)
.withConstructor(druidNodeDiscoveryProvider, authMapper)
.createMock();
EasyMock.replay(serversTable);
final DruidNodeDiscovery coordinatorNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery overlordNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery brokerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery routerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery historicalNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery mmNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
final DruidNodeDiscovery peonNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
EasyMock.expect(serverView.getDruidServers())
.andReturn(immutableDruidServers)
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR))
.andReturn(coordinatorNodeDiscovery)
.once();
EasyMock.replay(serverView);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD))
.andReturn(overlordNodeDiscovery)
.once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER)).andReturn(brokerNodeDiscovery).once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.ROUTER)).andReturn(routerNodeDiscovery).once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.HISTORICAL))
.andReturn(historicalNodeDiscovery)
.once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(mmNodeDiscovery)
.once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(peonNodeDiscovery).once();
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once();
EasyMock.expect(brokerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(broker1, broker2)).once();
EasyMock.expect(routerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(router)).once();
EasyMock.expect(historicalNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(historical1, historical2)).once();
EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once();
EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
EasyMock.replay(druidNodeDiscoveryProvider);
EasyMock.replay(
coordinatorNodeDiscovery,
overlordNodeDiscovery,
brokerNodeDiscovery,
routerNodeDiscovery,
historicalNodeDiscovery,
mmNodeDiscovery,
peonNodeDiscovery
);
DataContext dataContext = new DataContext()
{
@Override
@ -636,18 +752,144 @@ public class SystemSchemaTest extends CalciteTestBase
}
};
final List<Object[]> rows = serversTable.scan(dataContext).toList();
Assert.assertEquals(2, rows.size());
Object[] row1 = rows.get(0);
Assert.assertEquals("localhost:0000", row1[0]);
Assert.assertEquals("realtime", row1[4].toString());
Object[] row2 = rows.get(1);
Assert.assertEquals("server2:1234", row2[0]);
Assert.assertEquals("historical", row2[4].toString());
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
Assert.assertEquals(10, rows.size());
verifyServerRow(
rows.get(0),
"brokerHost:8082",
"brokerHost",
8082,
-1,
"broker",
null,
0,
0
);
verifyServerRow(
rows.get(1),
"histHost:8083",
"histHost",
8083,
-1,
"historical",
"tier",
0,
1000
);
verifyServerRow(
rows.get(2),
"localhost:8080",
"localhost",
8080,
-1,
"peon",
null,
0,
0
);
verifyServerRow(
rows.get(3),
"localhost:8081",
"localhost",
8081,
-1,
"coordinator",
null,
0,
0
);
verifyServerRow(
rows.get(4),
"localhost:8082",
"localhost",
8082,
-1,
"broker",
null,
0,
0
);
verifyServerRow(
rows.get(5),
"localhost:8083",
"localhost",
8083,
-1,
"historical",
"tier",
0,
1000
);
verifyServerRow(
rows.get(6),
"localhost:8090",
"localhost",
8090,
-1,
"overlord",
null,
0,
0
);
verifyServerRow(
rows.get(7),
"localhost:8888",
"localhost",
8888,
-1,
"router",
null,
0,
0
);
verifyServerRow(
rows.get(8),
"mmHost:8091",
"mmHost",
8091,
-1,
"middle_manager",
null,
0,
0
);
verifyServerRow(
rows.get(9),
"peonHost:8080",
"peonHost",
8080,
-1,
"peon",
null,
0,
0
);
// Verify value types.
verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE);
}
private void verifyServerRow(
Object[] row,
String server,
String host,
long plaintextPort,
long tlsPort,
String serverType,
String tier,
long currSize,
long maxSize)
{
Assert.assertEquals(server, row[0].toString());
Assert.assertEquals(host, row[1]);
Assert.assertEquals(plaintextPort, row[2]);
Assert.assertEquals(tlsPort, row[3]);
Assert.assertEquals(serverType, row[4]);
Assert.assertEquals(tier, row[5]);
Assert.assertEquals(currSize, row[6]);
Assert.assertEquals(maxSize, row[7]);
}
@Test
public void testServerSegmentsTable()
{
@ -739,7 +981,6 @@ public class SystemSchemaTest extends CalciteTestBase
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
String json = "[{\n"
+ "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ "\t\"type\": \"index\",\n"

View File

@ -764,6 +764,7 @@ public class CalciteTests
TEST_AUTHORIZER_MAPPER,
druidLeaderClient,
druidLeaderClient,
EasyMock.createMock(DruidNodeDiscoveryProvider.class),
getJsonMapper()
);
return schema;