From d3545f508672cf6b5eb66c7c28483f5ab01813ec Mon Sep 17 00:00:00 2001 From: Surekha Date: Wed, 15 May 2019 16:54:02 -0700 Subject: [PATCH] Show all server types in sys.servers table (#7654) * update sys.servers table to show all servers * update docs * Fix integration test * modify test query for batch integration test * fix case in test queries * make the server_type lowercase * Apply suggestions from code review Co-Authored-By: Himanshu * Fix compilation from git suggestion * fix unit test --- docs/content/querying/sql.md | 10 +- .../ITBasicAuthConfigurationTest.java | 2 +- .../sys_segment_batch_index_queries.json | 4 +- .../queries/sys_segment_queries.json | 4 +- .../sql/calcite/schema/SystemSchema.java | 63 +++-- .../sql/calcite/schema/SystemSchemaTest.java | 265 +++++++++++++++++- .../druid/sql/calcite/util/CalciteTests.java | 1 + 7 files changed, 310 insertions(+), 39 deletions(-) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index bf7fb047a52..169fa471c84 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -638,7 +638,7 @@ ORDER BY 2 DESC ``` ### SERVERS table -Servers table lists all data servers(any server that hosts a segment). It includes both Historicals and Peons. +Servers table lists all discovered servers in the cluster. |Column|Type|Notes| |------|-----|-----| @@ -646,10 +646,10 @@ Servers table lists all data servers(any server that hosts a segment). It includ |host|STRING|Hostname of the server| |plaintext_port|LONG|Unsecured port of the server, or -1 if plaintext traffic is disabled| |tls_port|LONG|TLS port of the server, or -1 if TLS is disabled| -|server_type|STRING|Type of Druid service. Possible values include: Historical, realtime and indexer_executor(Peon).| -|tier|STRING|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration)| -|current_size|LONG|Current size of segments in bytes on this server| -|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration)| +|server_type|STRING|Type of Druid service. Possible values include: COORDINATOR, OVERLORD, BROKER, ROUTER, HISTORICAL, MIDDLE_MANAGER or PEON.| +|tier|STRING|Distribution tier see [druid.server.tier](#../configuration/index.html#Historical-General-Configuration). Only valid for HISTORICAL type, for other types it's null| +|current_size|LONG|Current size of segments in bytes on this server. Only valid for HISTORICAL type, for other types it's 0| +|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration). Only valid for HISTORICAL type, for other types it's 0| To retrieve information about all servers, use the query: diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java index 67e1f37007f..5913d7c70ad 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java @@ -96,7 +96,7 @@ public class ITBasicAuthConfigurationTest "SELECT * FROM sys.segments WHERE datasource IN ('auth_test')"; private static final String SYS_SCHEMA_SERVERS_QUERY = - "SELECT * FROM sys.servers"; + "SELECT * FROM sys.servers WHERE tier IS NOT NULL"; private static final String SYS_SCHEMA_SERVER_SEGMENTS_QUERY = "SELECT * FROM sys.server_segments WHERE segment_id LIKE 'auth_test%'"; diff --git a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json index b8e863b95f4..53e063a9c8c 100644 --- a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json +++ b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json @@ -11,11 +11,11 @@ }, { "query": { - "query": "SELECT server_type FROM sys.servers" + "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL" }, "expectedResults": [ { - "server_type": "historical" + "server_type":"historical" } ] }, diff --git a/integration-tests/src/test/resources/queries/sys_segment_queries.json b/integration-tests/src/test/resources/queries/sys_segment_queries.json index 8e074107135..48c702a369e 100644 --- a/integration-tests/src/test/resources/queries/sys_segment_queries.json +++ b/integration-tests/src/test/resources/queries/sys_segment_queries.json @@ -16,11 +16,11 @@ }, { "query": { - "query": "SELECT server_type FROM sys.servers" + "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL" }, "expectedResults": [ { - "server_type": "historical" + "server_type":"historical" } ] } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index e5cfa911c45..bf0e3b6ef8b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -48,13 +48,17 @@ import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; @@ -75,6 +79,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -83,6 +88,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; public class SystemSchema extends AbstractSchema { @@ -114,6 +120,10 @@ public class SystemSchema extends AbstractSchema private static final long IS_OVERSHADOWED_FALSE = 0L; private static final long IS_OVERSHADOWED_TRUE = 1L; + //defaults for SERVERS table + private static final long MAX_SERVER_SIZE = 0L; + private static final long CURRENT_SERVER_SIZE = 0L; + static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) @@ -177,6 +187,7 @@ public class SystemSchema extends AbstractSchema final AuthorizerMapper authorizerMapper, final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, final @IndexingService DruidLeaderClient overlordDruidLeaderClient, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final ObjectMapper jsonMapper ) { @@ -190,7 +201,7 @@ public class SystemSchema extends AbstractSchema ); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, segmentsTable, - SERVERS_TABLE, new ServersTable(serverView, authorizerMapper), + SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) ); @@ -423,18 +434,21 @@ public class SystemSchema extends AbstractSchema } /** - * This table contains row per server. At this time it only contains the - * data servers (i.e. historicals and peons) + * This table contains row per server. It contains all the discovered servers in druid cluster. + * Some columns like tier and size are only applicable to historical nodes which contain segments. */ static class ServersTable extends AbstractTable implements ScannableTable { - private final TimelineServerView serverView; private final AuthorizerMapper authorizerMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - public ServersTable(TimelineServerView serverView, AuthorizerMapper authorizerMapper) + public ServersTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper + ) { - this.serverView = serverView; this.authorizerMapper = authorizerMapper; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; } @Override @@ -452,26 +466,41 @@ public class SystemSchema extends AbstractSchema @Override public Enumerable scan(DataContext root) { - final List druidServers = serverView.getDruidServers(); + final Iterator druidServers = getDruidServers(druidNodeDiscoveryProvider); final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); checkStateReadAccessForServers(authenticationResult, authorizerMapper); final FluentIterable results = FluentIterable - .from(druidServers) - .transform(val -> new Object[]{ - val.getHost(), - extractHost(val.getHost()), - (long) extractPort(val.getHostAndPort()), - (long) extractPort(val.getHostAndTlsPort()), - toStringOrNull(val.getType()), - val.getTier(), - val.getCurrSize(), - val.getMaxSize() + .from(() -> druidServers) + .transform(val -> { + boolean isDataNode = false; + final DruidNode node = val.getDruidNode(); + if (val.getNodeType().equals(NodeType.HISTORICAL)) { + isDataNode = true; + } + return new Object[]{ + node.getHostAndPortToUse(), + extractHost(node.getHost()), + (long) extractPort(node.getHostAndPort()), + (long) extractPort(node.getHostAndTlsPort()), + StringUtils.toLowerCase(toStringOrNull(val.getNodeType())), + isDataNode ? val.toDruidServer().getTier() : null, + isDataNode ? val.toDruidServer().getCurrSize() : CURRENT_SERVER_SIZE, + isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE + }; }); return Linq4j.asEnumerable(results); } + + private Iterator getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) + { + return Arrays.stream(NodeType.values()) + .flatMap(nodeType -> druidNodeDiscoveryProvider.getForNodeType(nodeType).getAllNodes().stream()) + .collect(Collectors.toList()) + .iterator(); + } } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a942db4b556..2d8e1d6e279 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -39,7 +39,12 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; +import org.apache.druid.discovery.DataNodeService; +import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.DruidNodeDiscovery; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -60,6 +65,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; @@ -137,6 +143,7 @@ public class SystemSchemaTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; private MetadataSegmentView metadataView; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -237,6 +244,7 @@ public class SystemSchemaTest extends CalciteTestBase druidSchema.start(); druidSchema.awaitInitialization(); metadataView = EasyMock.createMock(MetadataSegmentView.class); + druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); schema = new SystemSchema( druidSchema, metadataView, @@ -244,6 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase EasyMock.createStrictMock(AuthorizerMapper.class), client, client, + druidNodeDiscoveryProvider, mapper ); } @@ -349,6 +358,75 @@ public class SystemSchemaTest extends CalciteTestBase final List realtimeSegments = ImmutableList.of(segment2, segment4, segment5); + private final DiscoveryDruidNode coordinator = new DiscoveryDruidNode( + new DruidNode("s1", "localhost", false, 8081, null, true, false), + NodeType.COORDINATOR, + ImmutableMap.of() + ); + + private final DiscoveryDruidNode overlord = new DiscoveryDruidNode( + new DruidNode("s2", "localhost", false, 8090, null, true, false), + NodeType.OVERLORD, + ImmutableMap.of() + ); + + private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode( + new DruidNode("s3", "localhost", false, 8082, null, true, false), + NodeType.BROKER, + ImmutableMap.of() + ); + + private final DiscoveryDruidNode broker2 = new DiscoveryDruidNode( + new DruidNode("s3", "brokerHost", false, 8082, null, true, false), + NodeType.BROKER, + ImmutableMap.of() + ); + + private final DiscoveryDruidNode router = new DiscoveryDruidNode( + new DruidNode("s4", "localhost", false, 8888, null, true, false), + NodeType.ROUTER, + ImmutableMap.of() + ); + + private final DiscoveryDruidNode historical1 = new DiscoveryDruidNode( + new DruidNode("s5", "localhost", false, 8083, null, true, false), + NodeType.HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + ) + ); + + private final DiscoveryDruidNode historical2 = new DiscoveryDruidNode( + new DruidNode("s5", "histHost", false, 8083, null, true, false), + NodeType.HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + ); + + private final DiscoveryDruidNode middleManager = new DiscoveryDruidNode( + new DruidNode("s6", "mmHost", false, 8091, null, true, false), + NodeType.MIDDLE_MANAGER, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)) + ); + + private final DiscoveryDruidNode peon1 = new DiscoveryDruidNode( + new DruidNode("s7", "localhost", false, 8080, null, true, false), + NodeType.PEON, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + ) + ); + + private final DiscoveryDruidNode peon2 = new DiscoveryDruidNode( + new DruidNode("s7", "peonHost", false, 8080, null, true, false), + NodeType.PEON, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + ); + + + private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer( new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), 1L, @@ -601,14 +679,52 @@ public class SystemSchemaTest extends CalciteTestBase { SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) - .withConstructor(serverView, authMapper) + .withConstructor(druidNodeDiscoveryProvider, authMapper) .createMock(); EasyMock.replay(serversTable); + final DruidNodeDiscovery coordinatorNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery overlordNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery brokerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery routerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery historicalNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery mmNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + final DruidNodeDiscovery peonNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(serverView.getDruidServers()) - .andReturn(immutableDruidServers) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR)) + .andReturn(coordinatorNodeDiscovery) .once(); - EasyMock.replay(serverView); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD)) + .andReturn(overlordNodeDiscovery) + .once(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER)).andReturn(brokerNodeDiscovery).once(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.ROUTER)).andReturn(routerNodeDiscovery).once(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.HISTORICAL)) + .andReturn(historicalNodeDiscovery) + .once(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) + .andReturn(mmNodeDiscovery) + .once(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(peonNodeDiscovery).once(); + + EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once(); + EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once(); + EasyMock.expect(brokerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(broker1, broker2)).once(); + EasyMock.expect(routerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(router)).once(); + EasyMock.expect(historicalNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(historical1, historical2)).once(); + EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once(); + EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once(); + + EasyMock.replay(druidNodeDiscoveryProvider); + EasyMock.replay( + coordinatorNodeDiscovery, + overlordNodeDiscovery, + brokerNodeDiscovery, + routerNodeDiscovery, + historicalNodeDiscovery, + mmNodeDiscovery, + peonNodeDiscovery + ); + DataContext dataContext = new DataContext() { @Override @@ -636,18 +752,144 @@ public class SystemSchemaTest extends CalciteTestBase } }; final List rows = serversTable.scan(dataContext).toList(); - Assert.assertEquals(2, rows.size()); - Object[] row1 = rows.get(0); - Assert.assertEquals("localhost:0000", row1[0]); - Assert.assertEquals("realtime", row1[4].toString()); - Object[] row2 = rows.get(1); - Assert.assertEquals("server2:1234", row2[0]); - Assert.assertEquals("historical", row2[4].toString()); + rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); + Assert.assertEquals(10, rows.size()); + verifyServerRow( + rows.get(0), + "brokerHost:8082", + "brokerHost", + 8082, + -1, + "broker", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(1), + "histHost:8083", + "histHost", + 8083, + -1, + "historical", + "tier", + 0, + 1000 + ); + verifyServerRow( + rows.get(2), + "localhost:8080", + "localhost", + 8080, + -1, + "peon", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(3), + "localhost:8081", + "localhost", + 8081, + -1, + "coordinator", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(4), + "localhost:8082", + "localhost", + 8082, + -1, + "broker", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(5), + "localhost:8083", + "localhost", + 8083, + -1, + "historical", + "tier", + 0, + 1000 + ); + verifyServerRow( + rows.get(6), + "localhost:8090", + "localhost", + 8090, + -1, + "overlord", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(7), + "localhost:8888", + "localhost", + 8888, + -1, + "router", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(8), + "mmHost:8091", + "mmHost", + 8091, + -1, + "middle_manager", + null, + 0, + 0 + ); + verifyServerRow( + rows.get(9), + "peonHost:8080", + "peonHost", + 8080, + -1, + "peon", + null, + 0, + 0 + ); // Verify value types. verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE); } + private void verifyServerRow( + Object[] row, + String server, + String host, + long plaintextPort, + long tlsPort, + String serverType, + String tier, + long currSize, + long maxSize) + { + Assert.assertEquals(server, row[0].toString()); + Assert.assertEquals(host, row[1]); + Assert.assertEquals(plaintextPort, row[2]); + Assert.assertEquals(tlsPort, row[3]); + Assert.assertEquals(serverType, row[4]); + Assert.assertEquals(tier, row[5]); + Assert.assertEquals(currSize, row[6]); + Assert.assertEquals(maxSize, row[7]); + } + @Test public void testServerSegmentsTable() { @@ -739,7 +981,6 @@ public class SystemSchemaTest extends CalciteTestBase AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + "\t\"type\": \"index\",\n" diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 8e8934a3a16..633799e8f71 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -764,6 +764,7 @@ public class CalciteTests TEST_AUTHORIZER_MAPPER, druidLeaderClient, druidLeaderClient, + EasyMock.createMock(DruidNodeDiscoveryProvider.class), getJsonMapper() ); return schema;