diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index bf0e3b6ef8b..f863104ff85 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -43,7 +43,9 @@ import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.InventoryView; import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; @@ -184,6 +186,7 @@ public class SystemSchema extends AbstractSchema final DruidSchema druidSchema, final MetadataSegmentView metadataView, final TimelineServerView serverView, + final InventoryView serverInventoryView, final AuthorizerMapper authorizerMapper, final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, final @IndexingService DruidLeaderClient overlordDruidLeaderClient, @@ -201,7 +204,7 @@ public class SystemSchema extends AbstractSchema ); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, segmentsTable, - SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, authorizerMapper), + SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) ); @@ -441,14 +444,17 @@ public class SystemSchema extends AbstractSchema { private final AuthorizerMapper authorizerMapper; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final InventoryView serverInventoryView; public ServersTable( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + InventoryView serverInventoryView, AuthorizerMapper authorizerMapper ) { this.authorizerMapper = authorizerMapper; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.serverInventoryView = serverInventoryView; } @Override @@ -477,7 +483,10 @@ public class SystemSchema extends AbstractSchema .transform(val -> { boolean isDataNode = false; final DruidNode node = val.getDruidNode(); + long currHistoricalSize = 0; if (val.getNodeType().equals(NodeType.HISTORICAL)) { + final DruidServer server = serverInventoryView.getInventoryValue(val.toDruidServer().getName()); + currHistoricalSize = server.getCurrSize(); isDataNode = true; } return new Object[]{ @@ -487,7 +496,7 @@ public class SystemSchema extends AbstractSchema (long) extractPort(node.getHostAndTlsPort()), StringUtils.toLowerCase(toStringOrNull(val.getNodeType())), isDataNode ? val.toDruidServer().getTier() : null, - isDataNode ? val.toDruidServer().getCurrSize() : CURRENT_SERVER_SIZE, + isDataNode ? currHistoricalSize : CURRENT_SERVER_SIZE, isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE }; }); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 2d8e1d6e279..49e406b0e6e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.InventoryView; +import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DataNodeService; @@ -144,6 +146,7 @@ public class SystemSchemaTest extends CalciteTestBase private static Closer resourceCloser; private MetadataSegmentView metadataView; private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private InventoryView serverInventoryView; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -245,10 +248,12 @@ public class SystemSchemaTest extends CalciteTestBase druidSchema.awaitInitialization(); metadataView = EasyMock.createMock(MetadataSegmentView.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + serverInventoryView = EasyMock.createMock(ServerInventoryView.class); schema = new SystemSchema( druidSchema, metadataView, serverView, + serverInventoryView, EasyMock.createStrictMock(AuthorizerMapper.class), client, client, @@ -425,8 +430,6 @@ public class SystemSchemaTest extends CalciteTestBase DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) ); - - private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer( new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), 1L, @@ -679,7 +682,11 @@ public class SystemSchemaTest extends CalciteTestBase { SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) - .withConstructor(druidNodeDiscoveryProvider, authMapper) + .withConstructor( + druidNodeDiscoveryProvider, + serverInventoryView, + authMapper + ) .createMock(); EasyMock.replay(serversTable); final DruidNodeDiscovery coordinatorNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); @@ -714,7 +721,14 @@ public class SystemSchemaTest extends CalciteTestBase EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once(); EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once(); - EasyMock.replay(druidNodeDiscoveryProvider); + final DruidServer server1 = EasyMock.createMock(DruidServer.class); + EasyMock.expect(serverInventoryView.getInventoryValue(historical1.toDruidServer().getName())).andReturn(server1).once(); + EasyMock.expect(server1.getCurrSize()).andReturn(200L).once(); + final DruidServer server2 = EasyMock.createMock(DruidServer.class); + EasyMock.expect(serverInventoryView.getInventoryValue(historical2.toDruidServer().getName())).andReturn(server2).once(); + EasyMock.expect(server2.getCurrSize()).andReturn(400L).once(); + + EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView, server1, server2); EasyMock.replay( coordinatorNodeDiscovery, overlordNodeDiscovery, @@ -773,7 +787,7 @@ public class SystemSchemaTest extends CalciteTestBase -1, "historical", "tier", - 0, + 400, 1000 ); verifyServerRow( @@ -817,7 +831,7 @@ public class SystemSchemaTest extends CalciteTestBase -1, "historical", "tier", - 0, + 200, 1000 ); verifyServerRow( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 633799e8f71..590205ea28f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -31,6 +31,7 @@ import com.google.inject.Key; import com.google.inject.Module; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.druid.client.BrokerSegmentWatcherConfig; +import org.apache.druid.client.ServerInventoryView; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.data.input.InputRow; @@ -761,6 +762,7 @@ public class CalciteTests plannerConfig ), new TestServerInventoryView(walker.getSegments()), + EasyMock.createMock(ServerInventoryView.class), TEST_AUTHORIZER_MAPPER, druidLeaderClient, druidLeaderClient,