diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index ae8ba9460fa..a004e80b810 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.calcite.DataContext; @@ -50,7 +51,6 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; @@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -88,9 +89,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; - private static final EmittingLogger log = new EmittingLogger(SystemSchema.class); - - private static final RowSignature SEGMENTS_SIGNATURE = RowSignature + static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) .add("datasource", ValueType.STRING) @@ -98,7 +97,7 @@ public class SystemSchema extends AbstractSchema .add("end", ValueType.STRING) .add("size", ValueType.LONG) .add("version", ValueType.STRING) - .add("partition_num", ValueType.STRING) + .add("partition_num", ValueType.LONG) .add("num_replicas", ValueType.LONG) .add("num_rows", ValueType.LONG) .add("is_published", ValueType.LONG) @@ -107,25 +106,25 @@ public class SystemSchema extends AbstractSchema .add("payload", ValueType.STRING) .build(); - private static final RowSignature SERVERS_SIGNATURE = RowSignature + static final RowSignature SERVERS_SIGNATURE = RowSignature .builder() .add("server", ValueType.STRING) .add("host", ValueType.STRING) - .add("plaintext_port", ValueType.STRING) - .add("tls_port", ValueType.STRING) + .add("plaintext_port", ValueType.LONG) + .add("tls_port", ValueType.LONG) .add("server_type", ValueType.STRING) .add("tier", ValueType.STRING) .add("curr_size", ValueType.LONG) .add("max_size", ValueType.LONG) .build(); - private static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature + static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature .builder() .add("server", ValueType.STRING) .add("segment_id", ValueType.STRING) .build(); - private static final RowSignature TASKS_SIGNATURE = RowSignature + static final RowSignature TASKS_SIGNATURE = RowSignature .builder() .add("task_id", ValueType.STRING) .add("type", ValueType.STRING) @@ -137,8 +136,8 @@ public class SystemSchema extends AbstractSchema .add("duration", ValueType.LONG) .add("location", ValueType.STRING) .add("host", ValueType.STRING) - .add("plaintext_port", ValueType.STRING) - .add("tls_port", ValueType.STRING) + .add("plaintext_port", ValueType.LONG) + .add("tls_port", ValueType.LONG) .add("error_msg", ValueType.STRING) .build(); @@ -156,12 +155,24 @@ public class SystemSchema extends AbstractSchema { Preconditions.checkNotNull(serverView, "serverView"); BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); - this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), - SERVERS_TABLE, new ServersTable(serverView, authorizerMapper), - SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) - ); + this.tableMap = ImmutableMap.builder() + .put( + SEGMENTS_TABLE, + new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + ) + .put( + SERVERS_TABLE, + new ServersTable(serverView, authorizerMapper) + ) + .put( + SERVER_SEGMENTS_TABLE, + new ServerSegmentsTable(serverView, authorizerMapper) + ) + .put( + TASKS_TABLE, + new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + ) + .build(); } @Override @@ -248,11 +259,11 @@ public class SystemSchema extends AbstractSchema return new Object[]{ val.getIdentifier(), val.getDataSource(), - val.getInterval().getStart(), - val.getInterval().getEnd(), + val.getInterval().getStart().toString(), + val.getInterval().getEnd().toString(), val.getSize(), val.getVersion(), - val.getShardSpec().getPartitionNum(), + Long.valueOf(val.getShardSpec().getPartitionNum()), numReplicas, numRows, 1L, //is_published is true for published segments @@ -276,16 +287,17 @@ public class SystemSchema extends AbstractSchema if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) { return null; } + final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getIdentifier()); + final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); return new Object[]{ val.getKey().getIdentifier(), val.getKey().getDataSource(), - val.getKey().getInterval().getStart(), - val.getKey().getInterval().getEnd(), + val.getKey().getInterval().getStart().toString(), + val.getKey().getInterval().getEnd().toString(), val.getKey().getSize(), val.getKey().getVersion(), - val.getKey().getShardSpec().getPartitionNum(), - partialSegmentDataMap.get(val.getKey().getIdentifier()) == null ? 0L - : partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(), + Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), + numReplicas, val.getValue().getNumRows(), val.getValue().isPublished(), val.getValue().isAvailable(), @@ -471,10 +483,10 @@ public class SystemSchema extends AbstractSchema .from(druidServers) .transform(val -> new Object[]{ val.getHost(), - val.getHost().split(":")[0], - val.getHostAndPort() == null ? -1 : val.getHostAndPort().split(":")[1], - val.getHostAndTlsPort() == null ? -1 : val.getHostAndTlsPort().split(":")[1], - val.getType(), + extractHost(val.getHost()), + (long) extractPort(val.getHostAndPort()), + (long) extractPort(val.getHostAndTlsPort()), + toStringOrNull(val.getType()), val.getTier(), val.getCurrSize(), val.getMaxSize() @@ -583,24 +595,33 @@ public class SystemSchema extends AbstractSchema @Override public Object[] current() { - TaskStatusPlus task = it.next(); - return new Object[]{task.getId(), - task.getType(), - task.getDataSource(), - task.getCreatedTime(), - task.getQueueInsertionTime(), - task.getStatusCode(), - task.getRunnerStatusCode(), - task.getDuration(), - task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort() - == -1 - ? task.getLocation() - .getPort() - : task.getLocation().getTlsPort()), - task.getLocation().getHost(), - task.getLocation().getPort(), - task.getLocation().getTlsPort(), - task.getErrorMsg()}; + final TaskStatusPlus task = it.next(); + final String hostAndPort; + + if (task.getLocation().getHost() == null) { + hostAndPort = null; + } else { + final int port = task.getLocation().getTlsPort() >= 0 + ? task.getLocation().getTlsPort() + : task.getLocation().getPort(); + + hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString(); + } + return new Object[]{ + task.getId(), + task.getType(), + task.getDataSource(), + toStringOrNull(task.getCreatedTime()), + toStringOrNull(task.getQueueInsertionTime()), + toStringOrNull(task.getStatusCode()), + toStringOrNull(task.getRunnerStatusCode()), + task.getDuration() == null ? 0L : task.getDuration(), + hostAndPort, + task.getLocation().getHost(), + (long) task.getLocation().getPort(), + (long) task.getLocation().getTlsPort(), + task.getErrorMsg() + }; } @Override @@ -632,7 +653,10 @@ public class SystemSchema extends AbstractSchema return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler)); } - private CloseableIterator getAuthorizedTasks(JsonParserIterator it, DataContext root) + private CloseableIterator getAuthorizedTasks( + JsonParserIterator it, + DataContext root + ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); @@ -723,4 +747,32 @@ public class SystemSchema extends AbstractSchema }; } + @Nullable + private static String extractHost(@Nullable final String hostAndPort) + { + if (hostAndPort == null) { + return null; + } + + return HostAndPort.fromString(hostAndPort).getHostText(); + } + + private static int extractPort(@Nullable final String hostAndPort) + { + if (hostAndPort == null) { + return -1; + } + + return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1); + } + + @Nullable + private static String toStringOrNull(@Nullable final Object object) + { + if (object == null) { + return null; + } + + return object.toString(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 3537ad62bef..851adff9454 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -26,8 +26,6 @@ import com.google.common.util.concurrent.SettableFuture; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -39,8 +37,10 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; @@ -54,6 +54,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -64,6 +65,7 @@ import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; @@ -147,10 +149,10 @@ public class SystemSchemaTest extends CalciteTestBase responseHandler = EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class) .withConstructor() .addMockedMethod( - "handleResponse", - HttpResponse.class, - HttpResponseHandler.TrafficCop.class - ) + "handleResponse", + HttpResponse.class, + HttpResponseHandler.TrafficCop.class + ) .addMockedMethod("getStatus") .createMock(); request = EasyMock.createMock(Request.class); @@ -441,36 +443,34 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = segmentsTable.scan(dataContext); - Enumerator enumerator = rows.enumerator(); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row1 = enumerator.current(); + final List rows = segmentsTable.scan(dataContext).toList(); + + Assert.assertEquals(6, rows.size()); + + Object[] row0 = rows.get(0); //segment 6 is published and unavailable, num_replicas is 0 - Assert.assertEquals(1L, row1[9]); - Assert.assertEquals(0L, row1[7]); - Assert.assertEquals(0L, row1[8]); //numRows = 0 + Assert.assertEquals(1L, row0[9]); + Assert.assertEquals(0L, row0[7]); + Assert.assertEquals(0L, row0[8]); //numRows = 0 - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - - Object[] row5 = enumerator.current(); + Object[] row4 = rows.get(4); //segment 2 is published and has 2 replicas - Assert.assertEquals(1L, row5[9]); - Assert.assertEquals(2L, row5[7]); - Assert.assertEquals(3L, row5[8]); //numRows = 3 - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(false, enumerator.moveNext()); + Assert.assertEquals(1L, row4[9]); + Assert.assertEquals(2L, row4[7]); + Assert.assertEquals(3L, row4[8]); //numRows = 3 + // Verify value types. + verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE); } @Test public void testServersTable() { - SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class).withConstructor(serverView, authMapper).createMock(); + SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) + .withConstructor(serverView, authMapper) + .createMock(); EasyMock.replay(serversTable); EasyMock.expect(serverView.getDruidServers()) @@ -503,14 +503,17 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = serversTable.scan(dataContext); - Assert.assertEquals(2, rows.count()); - Object[] row1 = rows.first(); + final List rows = serversTable.scan(dataContext).toList(); + Assert.assertEquals(2, rows.size()); + Object[] row1 = rows.get(0); Assert.assertEquals("localhost:0000", row1[0]); Assert.assertEquals("realtime", row1[4].toString()); - Object[] row2 = rows.last(); + Object[] row2 = rows.get(1); Assert.assertEquals("server2:1234", row2[0]); Assert.assertEquals("historical", row2[4].toString()); + + // Verify value types. + verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE); } @Test @@ -550,8 +553,6 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = serverSegmentsTable.scan(dataContext); - Assert.assertEquals(5, rows.count()); //server_segments table is the join of servers and segments table // it will have 5 rows as follows @@ -561,34 +562,31 @@ public class SystemSchemaTest extends CalciteTestBase // server2:1234 | test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4) // server2:1234 | test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5) - Enumerator enumerator = rows.enumerator(); - Assert.assertEquals(true, enumerator.moveNext()); + final List rows = serverSegmentsTable.scan(dataContext).toList(); + Assert.assertEquals(5, rows.size()); - Object[] row1 = rows.first(); + Object[] row0 = rows.get(0); + Assert.assertEquals("localhost:0000", row0[0]); + Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row0[1]); + + Object[] row1 = rows.get(1); Assert.assertEquals("localhost:0000", row1[0]); - Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row1[1]); + Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row1[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row2 = enumerator.current(); - Assert.assertEquals("localhost:0000", row2[0]); - Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row2[1]); + Object[] row2 = rows.get(2); + Assert.assertEquals("server2:1234", row2[0]); + Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row3 = enumerator.current(); + Object[] row3 = rows.get(3); Assert.assertEquals("server2:1234", row3[0]); - Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row3[1]); + Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row4 = enumerator.current(); + Object[] row4 = rows.get(4); Assert.assertEquals("server2:1234", row4[0]); - Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row4[1]); + Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row5 = rows.last(); - Assert.assertEquals("server2:1234", row5[0]); - Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row5[1]); - - Assert.assertEquals(false, enumerator.moveNext()); + // Verify value types. + verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE); } @Test @@ -672,26 +670,78 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = tasksTable.scan(dataContext); - Enumerator enumerator = rows.enumerator(); + final List rows = tasksTable.scan(dataContext).toList(); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row1 = enumerator.current(); - Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row1[0].toString()); - Assert.assertEquals("FAILED", row1[5].toString()); - Assert.assertEquals("NONE", row1[6].toString()); - Assert.assertEquals(-1L, row1[7]); - Assert.assertEquals("testHost:1234", row1[8]); + Object[] row0 = rows.get(0); + Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString()); + Assert.assertEquals("FAILED", row0[5].toString()); + Assert.assertEquals("NONE", row0[6].toString()); + Assert.assertEquals(-1L, row0[7]); + Assert.assertEquals("testHost:1234", row0[8]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row2 = enumerator.current(); - Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row2[0].toString()); - Assert.assertEquals("RUNNING", row2[5].toString()); - Assert.assertEquals("RUNNING", row2[6].toString()); - Assert.assertEquals(null, row2[7]); - Assert.assertEquals("192.168.1.6:8100", row2[8]); + Object[] row1 = rows.get(1); + Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); + Assert.assertEquals("RUNNING", row1[5].toString()); + Assert.assertEquals("RUNNING", row1[6].toString()); + Assert.assertEquals(0L, row1[7]); + Assert.assertEquals("192.168.1.6:8100", row1[8]); - Assert.assertEquals(false, enumerator.moveNext()); + // Verify value types. + verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); } + private static void verifyTypes(final List rows, final RowSignature signature) + { + final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl()); + + for (Object[] row : rows) { + Assert.assertEquals(row.length, signature.getRowOrder().size()); + + for (int i = 0; i < row.length; i++) { + final Class expectedClass; + + final ValueType columnType = signature.getColumnType(signature.getRowOrder().get(i)); + final boolean nullable = rowType.getFieldList().get(i).getType().isNullable(); + + switch (columnType) { + case LONG: + expectedClass = Long.class; + break; + case FLOAT: + expectedClass = Float.class; + break; + case DOUBLE: + expectedClass = Double.class; + break; + case STRING: + expectedClass = String.class; + break; + default: + throw new IAE("Don't know what class to expect for valueType[%s]", columnType); + } + + if (nullable) { + Assert.assertTrue( + StringUtils.format( + "Column[%s] is a [%s] or null (was %s)", + signature.getRowOrder().get(i), + expectedClass.getName(), + row[i] == null ? null : row[i].getClass().getName() + ), + row[i] == null || expectedClass.isAssignableFrom(row[i].getClass()) + ); + } else { + Assert.assertTrue( + StringUtils.format( + "Column[%s] is a [%s] (was %s)", + signature.getRowOrder().get(i), + expectedClass.getName(), + row[i] == null ? null : row[i].getClass().getName() + ), + row[i] != null && expectedClass.isAssignableFrom(row[i].getClass()) + ); + } + } + } + } }