mirror of https://github.com/apache/druid.git
Fix currSize attribute of historical server type (#7706)
This commit is contained in:
parent
cbbce955de
commit
1fe0de1c96
|
@ -43,7 +43,9 @@ import org.apache.calcite.schema.ScannableTable;
|
|||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.schema.impl.AbstractSchema;
|
||||
import org.apache.calcite.schema.impl.AbstractTable;
|
||||
import org.apache.druid.client.DruidServer;
|
||||
import org.apache.druid.client.ImmutableDruidServer;
|
||||
import org.apache.druid.client.InventoryView;
|
||||
import org.apache.druid.client.JsonParserIterator;
|
||||
import org.apache.druid.client.TimelineServerView;
|
||||
import org.apache.druid.client.coordinator.Coordinator;
|
||||
|
@ -184,6 +186,7 @@ public class SystemSchema extends AbstractSchema
|
|||
final DruidSchema druidSchema,
|
||||
final MetadataSegmentView metadataView,
|
||||
final TimelineServerView serverView,
|
||||
final InventoryView serverInventoryView,
|
||||
final AuthorizerMapper authorizerMapper,
|
||||
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
|
||||
final @IndexingService DruidLeaderClient overlordDruidLeaderClient,
|
||||
|
@ -201,7 +204,7 @@ public class SystemSchema extends AbstractSchema
|
|||
);
|
||||
this.tableMap = ImmutableMap.of(
|
||||
SEGMENTS_TABLE, segmentsTable,
|
||||
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, authorizerMapper),
|
||||
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
|
||||
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
|
||||
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
|
||||
);
|
||||
|
@ -441,14 +444,17 @@ public class SystemSchema extends AbstractSchema
|
|||
{
|
||||
private final AuthorizerMapper authorizerMapper;
|
||||
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
|
||||
private final InventoryView serverInventoryView;
|
||||
|
||||
public ServersTable(
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
InventoryView serverInventoryView,
|
||||
AuthorizerMapper authorizerMapper
|
||||
)
|
||||
{
|
||||
this.authorizerMapper = authorizerMapper;
|
||||
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
|
||||
this.serverInventoryView = serverInventoryView;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -477,7 +483,10 @@ public class SystemSchema extends AbstractSchema
|
|||
.transform(val -> {
|
||||
boolean isDataNode = false;
|
||||
final DruidNode node = val.getDruidNode();
|
||||
long currHistoricalSize = 0;
|
||||
if (val.getNodeType().equals(NodeType.HISTORICAL)) {
|
||||
final DruidServer server = serverInventoryView.getInventoryValue(val.toDruidServer().getName());
|
||||
currHistoricalSize = server.getCurrSize();
|
||||
isDataNode = true;
|
||||
}
|
||||
return new Object[]{
|
||||
|
@ -487,7 +496,7 @@ public class SystemSchema extends AbstractSchema
|
|||
(long) extractPort(node.getHostAndTlsPort()),
|
||||
StringUtils.toLowerCase(toStringOrNull(val.getNodeType())),
|
||||
isDataNode ? val.toDruidServer().getTier() : null,
|
||||
isDataNode ? val.toDruidServer().getCurrSize() : CURRENT_SERVER_SIZE,
|
||||
isDataNode ? currHistoricalSize : CURRENT_SERVER_SIZE,
|
||||
isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE
|
||||
};
|
||||
});
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
|
|||
import org.apache.druid.client.DruidServer;
|
||||
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||
import org.apache.druid.client.ImmutableDruidServer;
|
||||
import org.apache.druid.client.InventoryView;
|
||||
import org.apache.druid.client.ServerInventoryView;
|
||||
import org.apache.druid.client.TimelineServerView;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.discovery.DataNodeService;
|
||||
|
@ -144,6 +146,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
private static Closer resourceCloser;
|
||||
private MetadataSegmentView metadataView;
|
||||
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
|
||||
private InventoryView serverInventoryView;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
@ -245,10 +248,12 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
druidSchema.awaitInitialization();
|
||||
metadataView = EasyMock.createMock(MetadataSegmentView.class);
|
||||
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
serverInventoryView = EasyMock.createMock(ServerInventoryView.class);
|
||||
schema = new SystemSchema(
|
||||
druidSchema,
|
||||
metadataView,
|
||||
serverView,
|
||||
serverInventoryView,
|
||||
EasyMock.createStrictMock(AuthorizerMapper.class),
|
||||
client,
|
||||
client,
|
||||
|
@ -425,8 +430,6 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
|
||||
);
|
||||
|
||||
|
||||
|
||||
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
|
||||
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
|
||||
1L,
|
||||
|
@ -679,7 +682,11 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
{
|
||||
|
||||
SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class)
|
||||
.withConstructor(druidNodeDiscoveryProvider, authMapper)
|
||||
.withConstructor(
|
||||
druidNodeDiscoveryProvider,
|
||||
serverInventoryView,
|
||||
authMapper
|
||||
)
|
||||
.createMock();
|
||||
EasyMock.replay(serversTable);
|
||||
final DruidNodeDiscovery coordinatorNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
|
@ -714,7 +721,14 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once();
|
||||
EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
|
||||
|
||||
EasyMock.replay(druidNodeDiscoveryProvider);
|
||||
final DruidServer server1 = EasyMock.createMock(DruidServer.class);
|
||||
EasyMock.expect(serverInventoryView.getInventoryValue(historical1.toDruidServer().getName())).andReturn(server1).once();
|
||||
EasyMock.expect(server1.getCurrSize()).andReturn(200L).once();
|
||||
final DruidServer server2 = EasyMock.createMock(DruidServer.class);
|
||||
EasyMock.expect(serverInventoryView.getInventoryValue(historical2.toDruidServer().getName())).andReturn(server2).once();
|
||||
EasyMock.expect(server2.getCurrSize()).andReturn(400L).once();
|
||||
|
||||
EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView, server1, server2);
|
||||
EasyMock.replay(
|
||||
coordinatorNodeDiscovery,
|
||||
overlordNodeDiscovery,
|
||||
|
@ -773,7 +787,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
-1,
|
||||
"historical",
|
||||
"tier",
|
||||
0,
|
||||
400,
|
||||
1000
|
||||
);
|
||||
verifyServerRow(
|
||||
|
@ -817,7 +831,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
-1,
|
||||
"historical",
|
||||
"tier",
|
||||
0,
|
||||
200,
|
||||
1000
|
||||
);
|
||||
verifyServerRow(
|
||||
|
|
|
@ -31,6 +31,7 @@ import com.google.inject.Key;
|
|||
import com.google.inject.Module;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
import org.apache.druid.client.BrokerSegmentWatcherConfig;
|
||||
import org.apache.druid.client.ServerInventoryView;
|
||||
import org.apache.druid.collections.CloseableStupidPool;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
|
@ -761,6 +762,7 @@ public class CalciteTests
|
|||
plannerConfig
|
||||
),
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
EasyMock.createMock(ServerInventoryView.class),
|
||||
TEST_AUTHORIZER_MAPPER,
|
||||
druidLeaderClient,
|
||||
druidLeaderClient,
|
||||
|
|
Loading…
Reference in New Issue