diff --git a/core/src/main/java/org/apache/druid/indexer/TaskLocation.java b/core/src/main/java/org/apache/druid/indexer/TaskLocation.java index bf99378ac97..a1dee807ba4 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskLocation.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskLocation.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; import java.util.Objects; public class TaskLocation @@ -44,7 +45,7 @@ public class TaskLocation @JsonCreator public TaskLocation( - @JsonProperty("host") String host, + @JsonProperty("host") @Nullable String host, @JsonProperty("port") int port, @JsonProperty("tlsPort") int tlsPort ) @@ -54,6 +55,7 @@ public class TaskLocation this.tlsPort = tlsPort; } + @Nullable @JsonProperty public String getHost() { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java index a84eda399dc..8cdad831e4b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java @@ -63,10 +63,24 @@ public abstract class AbstractTestQueryHelper super(jsonMapper, queryClient, config); } - @Override - public void testQueriesFromFile(String filePath, int timesToRun) throws Exception - { - testQueriesFromFile(getQueryURL(broker), filePath, timesToRun); - testQueriesFromFile(getQueryURL(brokerTLS), filePath, timesToRun); - testQueriesFromFile(getQueryURL(router), filePath, timesToRun); - testQueriesFromFile(getQueryURL(routerTLS), filePath, timesToRun); - } - - public void testQueriesFromString(String str, int timesToRun) throws Exception - { - testQueriesFromString(getQueryURL(broker), str, timesToRun); - testQueriesFromString(getQueryURL(brokerTLS), str, timesToRun); - testQueriesFromString(getQueryURL(router), str, timesToRun); - testQueriesFromString(getQueryURL(routerTLS), str, timesToRun); - } - - @Override public String getQueryURL(String schemeAndHost) { diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java index 1687a1fc55d..ff4ee7f46e1 100644 --- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java +++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java @@ -21,7 +21,6 @@ package org.apache.druid.discovery; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.client.DruidServer; import org.apache.druid.server.DruidNode; import java.util.HashMap; @@ -39,8 +38,12 @@ public class DiscoveryDruidNode private final DruidNode druidNode; private final NodeRole nodeRole; - // Other metadata associated with the node e.g. - // if its a historical node then lookup information, segment loading capacity etc. + /** + * Other metadata associated with the node e.g. + * if it's a historical node then lookup information, segment loading capacity etc. + * + * @see DruidNodeDiscoveryProvider#SERVICE_TO_NODE_TYPES + */ private final Map services = new HashMap<>(); @JsonCreator @@ -80,19 +83,6 @@ public class DiscoveryDruidNode return druidNode; } - public DruidServer toDruidServer() - { - return new DruidServer( - getDruidNode().getHostAndPortToUse(), - getDruidNode().getHostAndPort(), - getDruidNode().getHostAndTlsPort(), - ((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(), - ((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(), - ((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(), - ((DataNodeService) getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority() - ); - } - @Override public boolean equals(Object o) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 72633a1097e..afe47fda807 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -49,12 +49,15 @@ import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.DruidService; import org.apache.druid.discovery.NodeRole; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.http.client.Request; @@ -122,10 +125,6 @@ public class SystemSchema extends AbstractSchema private static final long IS_OVERSHADOWED_FALSE = 0L; private static final long IS_OVERSHADOWED_TRUE = 1L; - //defaults for SERVERS table - private static final long MAX_SERVER_SIZE = 0L; - private static final long CURRENT_SERVER_SIZE = 0L; - static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) @@ -211,14 +210,8 @@ public class SystemSchema extends AbstractSchema { Preconditions.checkNotNull(serverView, "serverView"); BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); - final SegmentsTable segmentsTable = new SegmentsTable( - druidSchema, - metadataView, - jsonMapper, - authorizerMapper - ); this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, segmentsTable, + SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), @@ -238,20 +231,17 @@ public class SystemSchema extends AbstractSchema static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; - private final ObjectMapper jsonMapper; private final AuthorizerMapper authorizerMapper; private final MetadataSegmentView metadataView; public SegmentsTable( DruidSchema druidSchemna, MetadataSegmentView metadataView, - ObjectMapper jsonMapper, AuthorizerMapper authorizerMapper ) { this.druidSchema = druidSchemna; this.metadataView = metadataView; - this.jsonMapper = jsonMapper; this.authorizerMapper = authorizerMapper; } @@ -311,7 +301,7 @@ public class SystemSchema extends AbstractSchema segment.getInterval().getEnd().toString(), segment.getSize(), segment.getVersion(), - Long.valueOf(segment.getShardSpec().getPartitionNum()), + (long) segment.getShardSpec().getPartitionNum(), numReplicas, numRows, IS_PUBLISHED_TRUE, //is_published is true for published segments @@ -369,8 +359,10 @@ public class SystemSchema extends AbstractSchema DataContext root ) { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); final Iterable authorizedSegments = AuthorizationUtils .filterAuthorizedResources( @@ -387,8 +379,10 @@ public class SystemSchema extends AbstractSchema DataContext root ) { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); Function, Iterable> raGenerator = segment -> Collections.singletonList( @@ -455,6 +449,13 @@ public class SystemSchema extends AbstractSchema */ static class ServersTable extends AbstractTable implements ScannableTable { + // This is used for maxSize and currentSize when they are unknown. + // The unknown size doesn't have to be 0, it's better to be null. + // However, this table is returning 0 for them for some reason and we keep the behavior for backwards compatibility. + // Maybe we can remove this and return nulls instead when we remove the bindable query path which is currently + // used to query system tables. + private static final long UNKNOWN_SIZE = 0L; + private final AuthorizerMapper authorizerMapper; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final InventoryView serverInventoryView; @@ -486,37 +487,121 @@ public class SystemSchema extends AbstractSchema public Enumerable scan(DataContext root) { final Iterator druidServers = getDruidServers(druidNodeDiscoveryProvider); - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); checkStateReadAccessForServers(authenticationResult, authorizerMapper); final FluentIterable results = FluentIterable .from(() -> druidServers) - .transform((DiscoveryDruidNode val) -> { - boolean isDataNode = false; - final DruidNode node = val.getDruidNode(); - long currHistoricalSize = 0; - if (val.getNodeRole().equals(NodeRole.HISTORICAL)) { - final DruidServer server = serverInventoryView.getInventoryValue(val.toDruidServer().getName()); - currHistoricalSize = server.getCurrSize(); - isDataNode = true; + .transform((DiscoveryDruidNode discoveryDruidNode) -> { + //noinspection ConstantConditions + final boolean isDiscoverableDataServer = isDiscoverableDataServer(discoveryDruidNode); + + if (isDiscoverableDataServer) { + final DruidServer druidServer = serverInventoryView.getInventoryValue( + discoveryDruidNode.getDruidNode().getHostAndPortToUse() + ); + if (druidServer != null || discoveryDruidNode.getNodeRole().equals(NodeRole.HISTORICAL)) { + // Build a row for the data server if that server is in the server view, or the node type is historical. + // The historicals are usually supposed to be found in the server view. If some historicals are + // missing, it could mean that there are some problems in them to announce themselves. We just fill + // their status with nulls in this case. + return buildRowForDiscoverableDataServer(discoveryDruidNode, druidServer); + } else { + return buildRowForNonDataServer(discoveryDruidNode); + } + } else { + return buildRowForNonDataServer(discoveryDruidNode); } - return new Object[]{ - node.getHostAndPortToUse(), - extractHost(node.getHost()), - (long) extractPort(node.getHostAndPort()), - (long) extractPort(node.getHostAndTlsPort()), - StringUtils.toLowerCase(toStringOrNull(val.getNodeRole())), - isDataNode ? val.toDruidServer().getTier() : null, - isDataNode ? currHistoricalSize : CURRENT_SERVER_SIZE, - isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE - }; }); return Linq4j.asEnumerable(results); } - private Iterator getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) + /** + * Returns a row for all node types which don't serve data. The returned row contains only static information. + */ + private static Object[] buildRowForNonDataServer(DiscoveryDruidNode discoveryDruidNode) + { + final DruidNode node = discoveryDruidNode.getDruidNode(); + return new Object[]{ + node.getHostAndPortToUse(), + node.getHost(), + (long) node.getPlaintextPort(), + (long) node.getTlsPort(), + StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()), + null, + UNKNOWN_SIZE, + UNKNOWN_SIZE + }; + } + + /** + * Returns a row for discoverable data server. This method prefers the information from + * {@code serverFromInventoryView} if available which is the current state of the server. Otherwise, it + * will get the information from {@code discoveryDruidNode} which has only static configurations. + */ + private static Object[] buildRowForDiscoverableDataServer( + DiscoveryDruidNode discoveryDruidNode, + @Nullable DruidServer serverFromInventoryView + ) + { + final DruidNode node = discoveryDruidNode.getDruidNode(); + final DruidServer druidServerToUse = serverFromInventoryView == null + ? toDruidServer(discoveryDruidNode) + : serverFromInventoryView; + final long currentSize; + if (serverFromInventoryView == null) { + // If server is missing in serverInventoryView, the currentSize should be unknown + currentSize = UNKNOWN_SIZE; + } else { + currentSize = serverFromInventoryView.getCurrSize(); + } + return new Object[]{ + node.getHostAndPortToUse(), + node.getHost(), + (long) node.getPlaintextPort(), + (long) node.getTlsPort(), + StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()), + druidServerToUse.getTier(), + currentSize, + druidServerToUse.getMaxSize() + }; + } + + private static boolean isDiscoverableDataServer(DiscoveryDruidNode druidNode) + { + final DruidService druidService = druidNode.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY); + if (druidService == null) { + return false; + } + final DataNodeService dataNodeService = (DataNodeService) druidService; + return dataNodeService.isDiscoverable(); + } + + private static DruidServer toDruidServer(DiscoveryDruidNode discoveryDruidNode) + { + if (isDiscoverableDataServer(discoveryDruidNode)) { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + final DataNodeService dataNodeService = (DataNodeService) discoveryDruidNode + .getServices() + .get(DataNodeService.DISCOVERY_SERVICE_KEY); + return new DruidServer( + druidNode.getHostAndPortToUse(), + druidNode.getHostAndPort(), + druidNode.getHostAndTlsPort(), + dataNodeService.getMaxSize(), + dataNodeService.getType(), + dataNodeService.getTier(), + dataNodeService.getPriority() + ); + } else { + throw new ISE("[%s] is not a discoverable data server", discoveryDruidNode); + } + } + + private static Iterator getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) { return Arrays.stream(NodeRole.values()) .flatMap(nodeRole -> druidNodeDiscoveryProvider.getForNodeRole(nodeRole).getAllNodes().stream()) @@ -554,9 +639,10 @@ public class SystemSchema extends AbstractSchema @Override public Enumerable scan(DataContext root) { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); checkStateReadAccessForServers(authenticationResult, authorizerMapper); final List rows = new ArrayList<>(); @@ -643,9 +729,10 @@ public class SystemSchema extends AbstractSchema public Object[] current() { final TaskStatusPlus task = it.next(); - final String hostAndPort; + @Nullable final String host = task.getLocation().getHost(); + @Nullable final String hostAndPort; - if (task.getLocation().getHost() == null) { + if (host == null) { hostAndPort = null; } else { final int port; @@ -655,7 +742,7 @@ public class SystemSchema extends AbstractSchema port = task.getLocation().getPort(); } - hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString(); + hostAndPort = HostAndPort.fromParts(host, port).toString(); } return new Object[]{ task.getId(), @@ -668,7 +755,7 @@ public class SystemSchema extends AbstractSchema toStringOrNull(task.getRunnerStatusCode()), task.getDuration() == null ? 0L : task.getDuration(), hostAndPort, - task.getLocation().getHost(), + host, (long) task.getLocation().getPort(), (long) task.getLocation().getTlsPort(), task.getErrorMsg() @@ -709,8 +796,10 @@ public class SystemSchema extends AbstractSchema DataContext root ) { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); Function> raGenerator = task -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource())); @@ -873,8 +962,10 @@ public class SystemSchema extends AbstractSchema DataContext root ) { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); Function> raGenerator = supervisor -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource())); @@ -961,25 +1052,6 @@ public class SystemSchema extends AbstractSchema }; } - @Nullable - private static String extractHost(@Nullable final String hostAndPort) - { - if (hostAndPort == null) { - return null; - } - - return HostAndPort.fromString(hostAndPort).getHostText(); - } - - private static int extractPort(@Nullable final String hostAndPort) - { - if (hostAndPort == null) { - return -1; - } - - return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1); - } - @Nullable private static String toStringOrNull(@Nullable final Object object) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a0f3083c1de..08fe2ea5b70 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.SettableFuture; +import junitparams.converters.Nullable; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; @@ -81,6 +82,7 @@ import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable; import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; @@ -109,6 +111,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -384,6 +387,14 @@ public class SystemSchemaTest extends CalciteTestBase ImmutableMap.of() ); + private final DiscoveryDruidNode brokerWithBroadcastSegments = new DiscoveryDruidNode( + new DruidNode("s3", "brokerHostWithBroadcastSegments", false, 8082, 8282, true, true), + NodeRole.BROKER, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.BROKER, 0) + ) + ); + private final DiscoveryDruidNode router = new DiscoveryDruidNode( new DruidNode("s4", "localhost", false, 8888, null, true, false), NodeRole.ROUTER, @@ -402,21 +413,29 @@ public class SystemSchemaTest extends CalciteTestBase new DruidNode("s5", "histHost", false, 8083, null, true, false), NodeRole.HISTORICAL, ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + ) + ); + + private final DiscoveryDruidNode lameHistorical = new DiscoveryDruidNode( + new DruidNode("s5", "lameHost", false, 8083, null, true, false), + NodeRole.HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + ) ); private final DiscoveryDruidNode middleManager = new DiscoveryDruidNode( new DruidNode("s6", "mmHost", false, 8091, null, true, false), NodeRole.MIDDLE_MANAGER, - ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)) + ImmutableMap.of() ); private final DiscoveryDruidNode peon1 = new DiscoveryDruidNode( new DruidNode("s7", "localhost", false, 8080, null, true, false), NodeRole.PEON, ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) ) ); @@ -424,14 +443,16 @@ public class SystemSchemaTest extends CalciteTestBase new DruidNode("s7", "peonHost", false, 8080, null, true, false), NodeRole.PEON, ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + ) ); private final DiscoveryDruidNode indexer = new DiscoveryDruidNode( new DruidNode("s8", "indexerHost", false, 8091, null, true, false), NodeRole.INDEXER, ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)) + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + ) ); private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer( @@ -494,11 +515,7 @@ public class SystemSchemaTest extends CalciteTestBase @Test public void testSegmentsTable() { - final SystemSchema.SegmentsTable segmentsTable = EasyMock - .createMockBuilder(SystemSchema.SegmentsTable.class) - .withConstructor(druidSchema, metadataView, mapper, authMapper) - .createMock(); - EasyMock.replay(segmentsTable); + final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper); final Set publishedSegments = new HashSet<>(Arrays.asList( new SegmentWithOvershadowedStatus(publishedSegment1, true), new SegmentWithOvershadowedStatus(publishedSegment2, false), @@ -730,25 +747,32 @@ public class SystemSchemaTest extends CalciteTestBase EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once(); EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once(); - EasyMock.expect(brokerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(broker1, broker2)).once(); + EasyMock.expect(brokerNodeDiscovery.getAllNodes()) + .andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments)) + .once(); EasyMock.expect(routerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(router)).once(); - EasyMock.expect(historicalNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(historical1, historical2)).once(); + EasyMock.expect(historicalNodeDiscovery.getAllNodes()) + .andReturn(ImmutableList.of(historical1, historical2, lameHistorical)) + .once(); EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once(); EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once(); EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once(); - final DruidServer server1 = EasyMock.createMock(DruidServer.class); - EasyMock.expect(serverInventoryView.getInventoryValue(historical1.toDruidServer().getName())) - .andReturn(server1) + final List servers = new ArrayList<>(); + servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier")); + servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier")); + servers.add(mockDataServer(peon1.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + servers.add(mockDataServer(peon2.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + servers.add(mockDataServer(broker1.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + servers.add(mockDataServer(broker2.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + servers.add(mockDataServer(indexer.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + servers.add(mockDataServer(brokerWithBroadcastSegments.getDruidNode().getHostAndPortToUse(), 0L, 1000L, "tier")); + EasyMock.expect(serverInventoryView.getInventoryValue(lameHistorical.getDruidNode().getHostAndPortToUse())) + .andReturn(null) .once(); - EasyMock.expect(server1.getCurrSize()).andReturn(200L).once(); - final DruidServer server2 = EasyMock.createMock(DruidServer.class); - EasyMock.expect(serverInventoryView.getInventoryValue(historical2.toDruidServer().getName())) - .andReturn(server2) - .once(); - EasyMock.expect(server2.getCurrSize()).andReturn(400L).once(); - EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView, server1, server2); + EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView); + EasyMock.replay(servers.toArray(new Object[0])); EasyMock.replay( coordinatorNodeDiscovery, overlordNodeDiscovery, @@ -786,155 +810,151 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; + final List rows = serversTable.scan(dataContext).toList(); rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); - Assert.assertEquals(11, rows.size()); - verifyServerRow( - rows.get(0), - "brokerHost:8082", - "brokerHost", - 8082, - -1, - "broker", - null, - 0, - 0 + + final List expectedRows = new ArrayList<>(); + expectedRows.add( + createExpectedRow( + "brokerHost:8082", + "brokerHost", + 8082, + -1, + NodeRole.BROKER, + null, + 0L, + 0L + ) ); - verifyServerRow( - rows.get(1), - "histHost:8083", - "histHost", - 8083, - -1, - "historical", - "tier", - 400, - 1000 + expectedRows.add( + createExpectedRow( + "brokerHostWithBroadcastSegments:8282", + "brokerHostWithBroadcastSegments", + 8082, + 8282, + NodeRole.BROKER, + "tier", + 0L, + 1000L + ) ); - verifyServerRow( - rows.get(2), - "indexerHost:8091", - "indexerHost", - 8091, - -1, - "indexer", - null, - 0, - 0 + expectedRows.add( + createExpectedRow("histHost:8083", "histHost", 8083, -1, NodeRole.HISTORICAL, "tier", 400L, 1000L) ); - verifyServerRow( - rows.get(3), - "localhost:8080", - "localhost", - 8080, - -1, - "peon", - null, - 0, - 0 + expectedRows.add( + createExpectedRow("indexerHost:8091", "indexerHost", 8091, -1, NodeRole.INDEXER, "tier", 0L, 1000L) ); - verifyServerRow( - rows.get(4), - "localhost:8081", - "localhost", - 8081, - -1, - "coordinator", - null, - 0, - 0 + expectedRows.add( + createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L) ); - verifyServerRow( - rows.get(5), - "localhost:8082", - "localhost", - 8082, - -1, - "broker", - null, - 0, - 0 + expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L)); + expectedRows.add( + createExpectedRow( + "localhost:8081", + "localhost", + 8081, + -1, + NodeRole.COORDINATOR, + null, + 0L, + 0L + ) ); - verifyServerRow( - rows.get(6), - "localhost:8083", - "localhost", - 8083, - -1, - "historical", - "tier", - 200, - 1000 + expectedRows.add( + createExpectedRow( + "localhost:8082", + "localhost", + 8082, + -1, + NodeRole.BROKER, + null, + 0L, + 0L + ) ); - verifyServerRow( - rows.get(7), - "localhost:8090", - "localhost", - 8090, - -1, - "overlord", - null, - 0, - 0 + expectedRows.add( + createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L) ); - verifyServerRow( - rows.get(8), - "localhost:8888", - "localhost", - 8888, - -1, - "router", - null, - 0, - 0 + expectedRows.add( + createExpectedRow( + "localhost:8090", + "localhost", + 8090, + -1, + NodeRole.OVERLORD, + null, + 0L, + 0L + ) ); - verifyServerRow( - rows.get(9), - "mmHost:8091", - "mmHost", - 8091, - -1, - "middle_manager", - null, - 0, - 0 + expectedRows.add( + createExpectedRow( + "localhost:8888", + "localhost", + 8888, + -1, + NodeRole.ROUTER, + null, + 0L, + 0L + ) ); - verifyServerRow( - rows.get(10), - "peonHost:8080", - "peonHost", - 8080, - -1, - "peon", - null, - 0, - 0 + expectedRows.add( + createExpectedRow( + "mmHost:8091", + "mmHost", + 8091, + -1, + NodeRole.MIDDLE_MANAGER, + null, + 0L, + 0L + ) ); + expectedRows.add(createExpectedRow("peonHost:8080", "peonHost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L)); + Assert.assertEquals(expectedRows.size(), rows.size()); + for (int i = 0; i < rows.size(); i++) { + Assert.assertArrayEquals(expectedRows.get(i), rows.get(i)); + } // Verify value types. verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE); } - private void verifyServerRow( - Object[] row, + private DruidServer mockDataServer(String name, long currentSize, long maxSize, String tier) + { + final DruidServer server = EasyMock.createMock(DruidServer.class); + EasyMock.expect(serverInventoryView.getInventoryValue(name)) + .andReturn(server) + .once(); + EasyMock.expect(server.getCurrSize()).andReturn(currentSize).once(); + EasyMock.expect(server.getMaxSize()).andReturn(maxSize).once(); + EasyMock.expect(server.getTier()).andReturn(tier).once(); + return server; + } + + private Object[] createExpectedRow( String server, String host, - long plaintextPort, - long tlsPort, - String serverType, - String tier, - long currSize, - long maxSize + int plaintextPort, + int tlsPort, + NodeRole nodeRole, + @Nullable String tier, + @Nullable Long currSize, + @Nullable Long maxSize ) { - Assert.assertEquals(server, row[0].toString()); - Assert.assertEquals(host, row[1]); - Assert.assertEquals(plaintextPort, row[2]); - Assert.assertEquals(tlsPort, row[3]); - Assert.assertEquals(serverType, row[4]); - Assert.assertEquals(tier, row[5]); - Assert.assertEquals(currSize, row[6]); - Assert.assertEquals(maxSize, row[7]); + return new Object[]{ + server, + host, + (long) plaintextPort, + (long) tlsPort, + StringUtils.toLowerCase(nodeRole.toString()), + tier, + currSize, + maxSize + }; } @Test