Fix sys.servers table to not throw NPE and handle brokers/indexers/peons properly for broadcast segments (#10183)

* Fix sys.servers table to not throw NPE and handle brokers/indexers/peons properly for broadcast segments

* fix tests and add missing tests

* revert null handling fix

* unused import

* move out util methods from DiscoveryDruidNode
This commit is contained in:
Jihoon Son 2020-07-21 17:52:51 -07:00 committed by GitHub
parent d4bd6e5207
commit 26d099f39b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 338 additions and 267 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.Objects;
public class TaskLocation
@ -44,7 +45,7 @@ public class TaskLocation
@JsonCreator
public TaskLocation(
@JsonProperty("host") String host,
@JsonProperty("host") @Nullable String host,
@JsonProperty("port") int port,
@JsonProperty("tlsPort") int tlsPort
)
@ -54,6 +55,7 @@ public class TaskLocation
this.tlsPort = tlsPort;
}
@Nullable
@JsonProperty
public String getHost()
{

View File

@ -63,10 +63,24 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
this.routerTLS = config.getRouterTLSUrl();
}
public abstract void testQueriesFromFile(String filePath, int timesToRun) throws Exception;
public abstract String getQueryURL(String schemeAndHost);
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
{
testQueriesFromFile(getQueryURL(broker), filePath, timesToRun);
testQueriesFromFile(getQueryURL(brokerTLS), filePath, timesToRun);
testQueriesFromFile(getQueryURL(router), filePath, timesToRun);
testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun);
}
public void testQueriesFromString(String str, int timesToRun) throws Exception
{
testQueriesFromString(getQueryURL(broker), str, timesToRun);
testQueriesFromString(getQueryURL(brokerTLS), str, timesToRun);
testQueriesFromString(getQueryURL(router), str, timesToRun);
testQueriesFromString(getQueryURL(routerTLS), str, timesToRun);
}
public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
{
LOG.info("Starting query tests for [%s]", filePath);

View File

@ -38,15 +38,6 @@ public class SqlTestQueryHelper extends AbstractTestQueryHelper<SqlQueryWithResu
super(jsonMapper, sqlClient, config);
}
@Override
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
{
testQueriesFromFile(getQueryURL(broker), filePath, timesToRun);
testQueriesFromFile(getQueryURL(brokerTLS), filePath, timesToRun);
testQueriesFromFile(getQueryURL(router), filePath, timesToRun);
testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun);
}
@Override
public String getQueryURL(String schemeAndHost)
{

View File

@ -38,24 +38,6 @@ public class TestQueryHelper extends AbstractTestQueryHelper<QueryWithResults>
super(jsonMapper, queryClient, config);
}
@Override
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
{
testQueriesFromFile(getQueryURL(broker), filePath, timesToRun);
testQueriesFromFile(getQueryURL(brokerTLS), filePath, timesToRun);
testQueriesFromFile(getQueryURL(router), filePath, timesToRun);
testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun);
}
public void testQueriesFromString(String str, int timesToRun) throws Exception
{
testQueriesFromString(getQueryURL(broker), str, timesToRun);
testQueriesFromString(getQueryURL(brokerTLS), str, timesToRun);
testQueriesFromString(getQueryURL(router), str, timesToRun);
testQueriesFromString(getQueryURL(routerTLS), str, timesToRun);
}
@Override
public String getQueryURL(String schemeAndHost)
{

View File

@ -21,7 +21,6 @@ package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.client.DruidServer;
import org.apache.druid.server.DruidNode;
import java.util.HashMap;
@ -39,8 +38,12 @@ public class DiscoveryDruidNode
private final DruidNode druidNode;
private final NodeRole nodeRole;
// Other metadata associated with the node e.g.
// if its a historical node then lookup information, segment loading capacity etc.
/**
* Other metadata associated with the node e.g.
* if it's a historical node then lookup information, segment loading capacity etc.
*
* @see DruidNodeDiscoveryProvider#SERVICE_TO_NODE_TYPES
*/
private final Map<String, DruidService> services = new HashMap<>();
@JsonCreator
@ -80,19 +83,6 @@ public class DiscoveryDruidNode
return druidNode;
}
public DruidServer toDruidServer()
{
return new DruidServer(
getDruidNode().getHostAndPortToUse(),
getDruidNode().getHostAndPort(),
getDruidNode().getHostAndTlsPort(),
((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(),
((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
);
}
@Override
public boolean equals(Object o)
{

View File

@ -49,12 +49,15 @@ import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.DruidService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.Request;
@ -122,10 +125,6 @@ public class SystemSchema extends AbstractSchema
private static final long IS_OVERSHADOWED_FALSE = 0L;
private static final long IS_OVERSHADOWED_TRUE = 1L;
//defaults for SERVERS table
private static final long MAX_SERVER_SIZE = 0L;
private static final long CURRENT_SERVER_SIZE = 0L;
static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("segment_id", ValueType.STRING)
@ -211,14 +210,8 @@ public class SystemSchema extends AbstractSchema
{
Preconditions.checkNotNull(serverView, "serverView");
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
final SegmentsTable segmentsTable = new SegmentsTable(
druidSchema,
metadataView,
jsonMapper,
authorizerMapper
);
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, segmentsTable,
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper),
@ -238,20 +231,17 @@ public class SystemSchema extends AbstractSchema
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView;
public SegmentsTable(
DruidSchema druidSchemna,
MetadataSegmentView metadataView,
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper
)
{
this.druidSchema = druidSchemna;
this.metadataView = metadataView;
this.jsonMapper = jsonMapper;
this.authorizerMapper = authorizerMapper;
}
@ -311,7 +301,7 @@ public class SystemSchema extends AbstractSchema
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
Long.valueOf(segment.getShardSpec().getPartitionNum()),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
numRows,
IS_PUBLISHED_TRUE, //is_published is true for published segments
@ -369,8 +359,10 @@ public class SystemSchema extends AbstractSchema
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils
.filterAuthorizedResources(
@ -387,8 +379,10 @@ public class SystemSchema extends AbstractSchema
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
Function<Entry<SegmentId, AvailableSegmentMetadata>, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
@ -455,6 +449,13 @@ public class SystemSchema extends AbstractSchema
*/
static class ServersTable extends AbstractTable implements ScannableTable
{
// This is used for maxSize and currentSize when they are unknown.
// The unknown size doesn't have to be 0, it's better to be null.
// However, this table is returning 0 for them for some reason and we keep the behavior for backwards compatibility.
// Maybe we can remove this and return nulls instead when we remove the bindable query path which is currently
// used to query system tables.
private static final long UNKNOWN_SIZE = 0L;
private final AuthorizerMapper authorizerMapper;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final InventoryView serverInventoryView;
@ -486,37 +487,121 @@ public class SystemSchema extends AbstractSchema
public Enumerable<Object[]> scan(DataContext root)
{
final Iterator<DiscoveryDruidNode> druidServers = getDruidServers(druidNodeDiscoveryProvider);
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
checkStateReadAccessForServers(authenticationResult, authorizerMapper);
final FluentIterable<Object[]> results = FluentIterable
.from(() -> druidServers)
.transform((DiscoveryDruidNode val) -> {
boolean isDataNode = false;
final DruidNode node = val.getDruidNode();
long currHistoricalSize = 0;
if (val.getNodeRole().equals(NodeRole.HISTORICAL)) {
final DruidServer server = serverInventoryView.getInventoryValue(val.toDruidServer().getName());
currHistoricalSize = server.getCurrSize();
isDataNode = true;
.transform((DiscoveryDruidNode discoveryDruidNode) -> {
//noinspection ConstantConditions
final boolean isDiscoverableDataServer = isDiscoverableDataServer(discoveryDruidNode);
if (isDiscoverableDataServer) {
final DruidServer druidServer = serverInventoryView.getInventoryValue(
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
if (druidServer != null || discoveryDruidNode.getNodeRole().equals(NodeRole.HISTORICAL)) {
// Build a row for the data server if that server is in the server view, or the node type is historical.
// The historicals are usually supposed to be found in the server view. If some historicals are
// missing, it could mean that there are some problems in them to announce themselves. We just fill
// their status with nulls in this case.
return buildRowForDiscoverableDataServer(discoveryDruidNode, druidServer);
} else {
return buildRowForNonDataServer(discoveryDruidNode);
}
} else {
return buildRowForNonDataServer(discoveryDruidNode);
}
return new Object[]{
node.getHostAndPortToUse(),
extractHost(node.getHost()),
(long) extractPort(node.getHostAndPort()),
(long) extractPort(node.getHostAndTlsPort()),
StringUtils.toLowerCase(toStringOrNull(val.getNodeRole())),
isDataNode ? val.toDruidServer().getTier() : null,
isDataNode ? currHistoricalSize : CURRENT_SERVER_SIZE,
isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE
};
});
return Linq4j.asEnumerable(results);
}
private Iterator<DiscoveryDruidNode> getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider)
/**
* Returns a row for all node types which don't serve data. The returned row contains only static information.
*/
private static Object[] buildRowForNonDataServer(DiscoveryDruidNode discoveryDruidNode)
{
final DruidNode node = discoveryDruidNode.getDruidNode();
return new Object[]{
node.getHostAndPortToUse(),
node.getHost(),
(long) node.getPlaintextPort(),
(long) node.getTlsPort(),
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
null,
UNKNOWN_SIZE,
UNKNOWN_SIZE
};
}
/**
* Returns a row for discoverable data server. This method prefers the information from
* {@code serverFromInventoryView} if available which is the current state of the server. Otherwise, it
* will get the information from {@code discoveryDruidNode} which has only static configurations.
*/
private static Object[] buildRowForDiscoverableDataServer(
DiscoveryDruidNode discoveryDruidNode,
@Nullable DruidServer serverFromInventoryView
)
{
final DruidNode node = discoveryDruidNode.getDruidNode();
final DruidServer druidServerToUse = serverFromInventoryView == null
? toDruidServer(discoveryDruidNode)
: serverFromInventoryView;
final long currentSize;
if (serverFromInventoryView == null) {
// If server is missing in serverInventoryView, the currentSize should be unknown
currentSize = UNKNOWN_SIZE;
} else {
currentSize = serverFromInventoryView.getCurrSize();
}
return new Object[]{
node.getHostAndPortToUse(),
node.getHost(),
(long) node.getPlaintextPort(),
(long) node.getTlsPort(),
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
druidServerToUse.getTier(),
currentSize,
druidServerToUse.getMaxSize()
};
}
private static boolean isDiscoverableDataServer(DiscoveryDruidNode druidNode)
{
final DruidService druidService = druidNode.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY);
if (druidService == null) {
return false;
}
final DataNodeService dataNodeService = (DataNodeService) druidService;
return dataNodeService.isDiscoverable();
}
private static DruidServer toDruidServer(DiscoveryDruidNode discoveryDruidNode)
{
if (isDiscoverableDataServer(discoveryDruidNode)) {
final DruidNode druidNode = discoveryDruidNode.getDruidNode();
final DataNodeService dataNodeService = (DataNodeService) discoveryDruidNode
.getServices()
.get(DataNodeService.DISCOVERY_SERVICE_KEY);
return new DruidServer(
druidNode.getHostAndPortToUse(),
druidNode.getHostAndPort(),
druidNode.getHostAndTlsPort(),
dataNodeService.getMaxSize(),
dataNodeService.getType(),
dataNodeService.getTier(),
dataNodeService.getPriority()
);
} else {
throw new ISE("[%s] is not a discoverable data server", discoveryDruidNode);
}
}
private static Iterator<DiscoveryDruidNode> getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider)
{
return Arrays.stream(NodeRole.values())
.flatMap(nodeRole -> druidNodeDiscoveryProvider.getForNodeRole(nodeRole).getAllNodes().stream())
@ -554,9 +639,10 @@ public class SystemSchema extends AbstractSchema
@Override
public Enumerable<Object[]> scan(DataContext root)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
checkStateReadAccessForServers(authenticationResult, authorizerMapper);
final List<Object[]> rows = new ArrayList<>();
@ -643,9 +729,10 @@ public class SystemSchema extends AbstractSchema
public Object[] current()
{
final TaskStatusPlus task = it.next();
final String hostAndPort;
@Nullable final String host = task.getLocation().getHost();
@Nullable final String hostAndPort;
if (task.getLocation().getHost() == null) {
if (host == null) {
hostAndPort = null;
} else {
final int port;
@ -655,7 +742,7 @@ public class SystemSchema extends AbstractSchema
port = task.getLocation().getPort();
}
hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString();
hostAndPort = HostAndPort.fromParts(host, port).toString();
}
return new Object[]{
task.getId(),
@ -668,7 +755,7 @@ public class SystemSchema extends AbstractSchema
toStringOrNull(task.getRunnerStatusCode()),
task.getDuration() == null ? 0L : task.getDuration(),
hostAndPort,
task.getLocation().getHost(),
host,
(long) task.getLocation().getPort(),
(long) task.getLocation().getTlsPort(),
task.getErrorMsg()
@ -709,8 +796,10 @@ public class SystemSchema extends AbstractSchema
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
Function<TaskStatusPlus, Iterable<ResourceAction>> raGenerator = task -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource()));
@ -873,8 +962,10 @@ public class SystemSchema extends AbstractSchema
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator = supervisor -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
@ -961,25 +1052,6 @@ public class SystemSchema extends AbstractSchema
};
}
@Nullable
private static String extractHost(@Nullable final String hostAndPort)
{
if (hostAndPort == null) {
return null;
}
return HostAndPort.fromString(hostAndPort).getHostText();
}
private static int extractPort(@Nullable final String hostAndPort)
{
if (hostAndPort == null) {
return -1;
}
return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1);
}
@Nullable
private static String toStringOrNull(@Nullable final Object object)
{

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import junitparams.converters.Nullable;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@ -81,6 +82,7 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
@ -109,6 +111,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -384,6 +387,14 @@ public class SystemSchemaTest extends CalciteTestBase
ImmutableMap.of()
);
private final DiscoveryDruidNode brokerWithBroadcastSegments = new DiscoveryDruidNode(
new DruidNode("s3", "brokerHostWithBroadcastSegments", false, 8082, 8282, true, true),
NodeRole.BROKER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.BROKER, 0)
)
);
private final DiscoveryDruidNode router = new DiscoveryDruidNode(
new DruidNode("s4", "localhost", false, 8888, null, true, false),
NodeRole.ROUTER,
@ -402,21 +413,29 @@ public class SystemSchemaTest extends CalciteTestBase
new DruidNode("s5", "histHost", false, 8083, null, true, false),
NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
);
private final DiscoveryDruidNode lameHistorical = new DiscoveryDruidNode(
new DruidNode("s5", "lameHost", false, 8083, null, true, false),
NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
);
private final DiscoveryDruidNode middleManager = new DiscoveryDruidNode(
new DruidNode("s6", "mmHost", false, 8091, null, true, false),
NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
ImmutableMap.of()
);
private final DiscoveryDruidNode peon1 = new DiscoveryDruidNode(
new DruidNode("s7", "localhost", false, 8080, null, true, false),
NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
)
);
@ -424,14 +443,16 @@ public class SystemSchemaTest extends CalciteTestBase
new DruidNode("s7", "peonHost", false, 8080, null, true, false),
NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
)
);
private final DiscoveryDruidNode indexer = new DiscoveryDruidNode(
new DruidNode("s8", "indexerHost", false, 8091, null, true, false),
NodeRole.INDEXER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
)
);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
@ -494,11 +515,7 @@ public class SystemSchemaTest extends CalciteTestBase
@Test
public void testSegmentsTable()
{
final SystemSchema.SegmentsTable segmentsTable = EasyMock
.createMockBuilder(SystemSchema.SegmentsTable.class)
.withConstructor(druidSchema, metadataView, mapper, authMapper)
.createMock();
EasyMock.replay(segmentsTable);
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper);
final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList(
new SegmentWithOvershadowedStatus(publishedSegment1, true),
new SegmentWithOvershadowedStatus(publishedSegment2, false),
@ -730,25 +747,32 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once();
EasyMock.expect(brokerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(broker1, broker2)).once();
EasyMock.expect(brokerNodeDiscovery.getAllNodes())
.andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments))
.once();
EasyMock.expect(routerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(router)).once();
EasyMock.expect(historicalNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(historical1, historical2)).once();
EasyMock.expect(historicalNodeDiscovery.getAllNodes())
.andReturn(ImmutableList.of(historical1, historical2, lameHistorical))
.once();
EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once();
EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once();
final DruidServer server1 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(serverInventoryView.getInventoryValue(historical1.toDruidServer().getName()))
.andReturn(server1)
final List<DruidServer> servers = new ArrayList<>();
servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier"));
servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier"));
servers.add(mockDataServer(peon1.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
servers.add(mockDataServer(peon2.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
servers.add(mockDataServer(broker1.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
servers.add(mockDataServer(broker2.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
servers.add(mockDataServer(indexer.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
servers.add(mockDataServer(brokerWithBroadcastSegments.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier"));
EasyMock.expect(serverInventoryView.getInventoryValue(lameHistorical.getDruidNode().getHostAndPortToUse()))
.andReturn(null)
.once();
EasyMock.expect(server1.getCurrSize()).andReturn(200L).once();
final DruidServer server2 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(serverInventoryView.getInventoryValue(historical2.toDruidServer().getName()))
.andReturn(server2)
.once();
EasyMock.expect(server2.getCurrSize()).andReturn(400L).once();
EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView, server1, server2);
EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView);
EasyMock.replay(servers.toArray(new Object[0]));
EasyMock.replay(
coordinatorNodeDiscovery,
overlordNodeDiscovery,
@ -786,155 +810,151 @@ public class SystemSchemaTest extends CalciteTestBase
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
final List<Object[]> rows = serversTable.scan(dataContext).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
Assert.assertEquals(11, rows.size());
verifyServerRow(
rows.get(0),
"brokerHost:8082",
"brokerHost",
8082,
-1,
"broker",
null,
0,
0
final List<Object[]> expectedRows = new ArrayList<>();
expectedRows.add(
createExpectedRow(
"brokerHost:8082",
"brokerHost",
8082,
-1,
NodeRole.BROKER,
null,
0L,
0L
)
);
verifyServerRow(
rows.get(1),
"histHost:8083",
"histHost",
8083,
-1,
"historical",
"tier",
400,
1000
expectedRows.add(
createExpectedRow(
"brokerHostWithBroadcastSegments:8282",
"brokerHostWithBroadcastSegments",
8082,
8282,
NodeRole.BROKER,
"tier",
0L,
1000L
)
);
verifyServerRow(
rows.get(2),
"indexerHost:8091",
"indexerHost",
8091,
-1,
"indexer",
null,
0,
0
expectedRows.add(
createExpectedRow("histHost:8083", "histHost", 8083, -1, NodeRole.HISTORICAL, "tier", 400L, 1000L)
);
verifyServerRow(
rows.get(3),
"localhost:8080",
"localhost",
8080,
-1,
"peon",
null,
0,
0
expectedRows.add(
createExpectedRow("indexerHost:8091", "indexerHost", 8091, -1, NodeRole.INDEXER, "tier", 0L, 1000L)
);
verifyServerRow(
rows.get(4),
"localhost:8081",
"localhost",
8081,
-1,
"coordinator",
null,
0,
0
expectedRows.add(
createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L)
);
verifyServerRow(
rows.get(5),
"localhost:8082",
"localhost",
8082,
-1,
"broker",
null,
0,
0
expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L));
expectedRows.add(
createExpectedRow(
"localhost:8081",
"localhost",
8081,
-1,
NodeRole.COORDINATOR,
null,
0L,
0L
)
);
verifyServerRow(
rows.get(6),
"localhost:8083",
"localhost",
8083,
-1,
"historical",
"tier",
200,
1000
expectedRows.add(
createExpectedRow(
"localhost:8082",
"localhost",
8082,
-1,
NodeRole.BROKER,
null,
0L,
0L
)
);
verifyServerRow(
rows.get(7),
"localhost:8090",
"localhost",
8090,
-1,
"overlord",
null,
0,
0
expectedRows.add(
createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L)
);
verifyServerRow(
rows.get(8),
"localhost:8888",
"localhost",
8888,
-1,
"router",
null,
0,
0
expectedRows.add(
createExpectedRow(
"localhost:8090",
"localhost",
8090,
-1,
NodeRole.OVERLORD,
null,
0L,
0L
)
);
verifyServerRow(
rows.get(9),
"mmHost:8091",
"mmHost",
8091,
-1,
"middle_manager",
null,
0,
0
expectedRows.add(
createExpectedRow(
"localhost:8888",
"localhost",
8888,
-1,
NodeRole.ROUTER,
null,
0L,
0L
)
);
verifyServerRow(
rows.get(10),
"peonHost:8080",
"peonHost",
8080,
-1,
"peon",
null,
0,
0
expectedRows.add(
createExpectedRow(
"mmHost:8091",
"mmHost",
8091,
-1,
NodeRole.MIDDLE_MANAGER,
null,
0L,
0L
)
);
expectedRows.add(createExpectedRow("peonHost:8080", "peonHost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L));
Assert.assertEquals(expectedRows.size(), rows.size());
for (int i = 0; i < rows.size(); i++) {
Assert.assertArrayEquals(expectedRows.get(i), rows.get(i));
}
// Verify value types.
verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE);
}
private void verifyServerRow(
Object[] row,
private DruidServer mockDataServer(String name, long currentSize, long maxSize, String tier)
{
final DruidServer server = EasyMock.createMock(DruidServer.class);
EasyMock.expect(serverInventoryView.getInventoryValue(name))
.andReturn(server)
.once();
EasyMock.expect(server.getCurrSize()).andReturn(currentSize).once();
EasyMock.expect(server.getMaxSize()).andReturn(maxSize).once();
EasyMock.expect(server.getTier()).andReturn(tier).once();
return server;
}
private Object[] createExpectedRow(
String server,
String host,
long plaintextPort,
long tlsPort,
String serverType,
String tier,
long currSize,
long maxSize
int plaintextPort,
int tlsPort,
NodeRole nodeRole,
@Nullable String tier,
@Nullable Long currSize,
@Nullable Long maxSize
)
{
Assert.assertEquals(server, row[0].toString());
Assert.assertEquals(host, row[1]);
Assert.assertEquals(plaintextPort, row[2]);
Assert.assertEquals(tlsPort, row[3]);
Assert.assertEquals(serverType, row[4]);
Assert.assertEquals(tier, row[5]);
Assert.assertEquals(currSize, row[6]);
Assert.assertEquals(maxSize, row[7]);
return new Object[]{
server,
host,
(long) plaintextPort,
(long) tlsPort,
StringUtils.toLowerCase(nodeRole.toString()),
tier,
currSize,
maxSize
};
}
@Test