From e9c3d3e651dc6cfc571e8d3a1a5b7fd65fbb0840 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 18 Nov 2018 17:24:19 -0800 Subject: [PATCH] SystemSchema: Fix data types for various fields. (#6642) * SystemSchema: Fix data types for various fields. - segments: start, end, partition_num - servers: plaintext_port, tls_port - tasks: plaintext_port, tls_port The declared and actual types did not match, but they must or else queries may generate ClassCastExceptions. Also adjusted some of the code for generating values to be more robust in the face of nulls or malformed strings. * Fix style. --- .../sql/calcite/schema/SystemSchema.java | 150 +++++++++----- .../sql/calcite/schema/SystemSchemaTest.java | 186 +++++++++++------- 2 files changed, 219 insertions(+), 117 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index ae8ba9460fa..a004e80b810 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.calcite.DataContext; @@ -50,7 +51,6 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; @@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -88,9 +89,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; - private static final EmittingLogger log = new EmittingLogger(SystemSchema.class); - - private static final RowSignature SEGMENTS_SIGNATURE = RowSignature + static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) .add("datasource", ValueType.STRING) @@ -98,7 +97,7 @@ public class SystemSchema extends AbstractSchema .add("end", ValueType.STRING) .add("size", ValueType.LONG) .add("version", ValueType.STRING) - .add("partition_num", ValueType.STRING) + .add("partition_num", ValueType.LONG) .add("num_replicas", ValueType.LONG) .add("num_rows", ValueType.LONG) .add("is_published", ValueType.LONG) @@ -107,25 +106,25 @@ public class SystemSchema extends AbstractSchema .add("payload", ValueType.STRING) .build(); - private static final RowSignature SERVERS_SIGNATURE = RowSignature + static final RowSignature SERVERS_SIGNATURE = RowSignature .builder() .add("server", ValueType.STRING) .add("host", ValueType.STRING) - .add("plaintext_port", ValueType.STRING) - .add("tls_port", ValueType.STRING) + .add("plaintext_port", ValueType.LONG) + .add("tls_port", ValueType.LONG) .add("server_type", ValueType.STRING) .add("tier", ValueType.STRING) .add("curr_size", ValueType.LONG) .add("max_size", ValueType.LONG) .build(); - private static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature + static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature .builder() .add("server", ValueType.STRING) .add("segment_id", ValueType.STRING) .build(); - private static final RowSignature TASKS_SIGNATURE = RowSignature + static final RowSignature TASKS_SIGNATURE = RowSignature .builder() .add("task_id", ValueType.STRING) .add("type", ValueType.STRING) @@ -137,8 +136,8 @@ public class SystemSchema extends AbstractSchema .add("duration", ValueType.LONG) .add("location", ValueType.STRING) .add("host", ValueType.STRING) - .add("plaintext_port", ValueType.STRING) - .add("tls_port", ValueType.STRING) + .add("plaintext_port", ValueType.LONG) + .add("tls_port", ValueType.LONG) .add("error_msg", ValueType.STRING) .build(); @@ -156,12 +155,24 @@ public class SystemSchema extends AbstractSchema { Preconditions.checkNotNull(serverView, "serverView"); BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); - this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), - SERVERS_TABLE, new ServersTable(serverView, authorizerMapper), - SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) - ); + this.tableMap = ImmutableMap.builder() + .put( + SEGMENTS_TABLE, + new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + ) + .put( + SERVERS_TABLE, + new ServersTable(serverView, authorizerMapper) + ) + .put( + SERVER_SEGMENTS_TABLE, + new ServerSegmentsTable(serverView, authorizerMapper) + ) + .put( + TASKS_TABLE, + new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + ) + .build(); } @Override @@ -248,11 +259,11 @@ public class SystemSchema extends AbstractSchema return new Object[]{ val.getIdentifier(), val.getDataSource(), - val.getInterval().getStart(), - val.getInterval().getEnd(), + val.getInterval().getStart().toString(), + val.getInterval().getEnd().toString(), val.getSize(), val.getVersion(), - val.getShardSpec().getPartitionNum(), + Long.valueOf(val.getShardSpec().getPartitionNum()), numReplicas, numRows, 1L, //is_published is true for published segments @@ -276,16 +287,17 @@ public class SystemSchema extends AbstractSchema if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) { return null; } + final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getIdentifier()); + final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); return new Object[]{ val.getKey().getIdentifier(), val.getKey().getDataSource(), - val.getKey().getInterval().getStart(), - val.getKey().getInterval().getEnd(), + val.getKey().getInterval().getStart().toString(), + val.getKey().getInterval().getEnd().toString(), val.getKey().getSize(), val.getKey().getVersion(), - val.getKey().getShardSpec().getPartitionNum(), - partialSegmentDataMap.get(val.getKey().getIdentifier()) == null ? 0L - : partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(), + Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), + numReplicas, val.getValue().getNumRows(), val.getValue().isPublished(), val.getValue().isAvailable(), @@ -471,10 +483,10 @@ public class SystemSchema extends AbstractSchema .from(druidServers) .transform(val -> new Object[]{ val.getHost(), - val.getHost().split(":")[0], - val.getHostAndPort() == null ? -1 : val.getHostAndPort().split(":")[1], - val.getHostAndTlsPort() == null ? -1 : val.getHostAndTlsPort().split(":")[1], - val.getType(), + extractHost(val.getHost()), + (long) extractPort(val.getHostAndPort()), + (long) extractPort(val.getHostAndTlsPort()), + toStringOrNull(val.getType()), val.getTier(), val.getCurrSize(), val.getMaxSize() @@ -583,24 +595,33 @@ public class SystemSchema extends AbstractSchema @Override public Object[] current() { - TaskStatusPlus task = it.next(); - return new Object[]{task.getId(), - task.getType(), - task.getDataSource(), - task.getCreatedTime(), - task.getQueueInsertionTime(), - task.getStatusCode(), - task.getRunnerStatusCode(), - task.getDuration(), - task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort() - == -1 - ? task.getLocation() - .getPort() - : task.getLocation().getTlsPort()), - task.getLocation().getHost(), - task.getLocation().getPort(), - task.getLocation().getTlsPort(), - task.getErrorMsg()}; + final TaskStatusPlus task = it.next(); + final String hostAndPort; + + if (task.getLocation().getHost() == null) { + hostAndPort = null; + } else { + final int port = task.getLocation().getTlsPort() >= 0 + ? task.getLocation().getTlsPort() + : task.getLocation().getPort(); + + hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString(); + } + return new Object[]{ + task.getId(), + task.getType(), + task.getDataSource(), + toStringOrNull(task.getCreatedTime()), + toStringOrNull(task.getQueueInsertionTime()), + toStringOrNull(task.getStatusCode()), + toStringOrNull(task.getRunnerStatusCode()), + task.getDuration() == null ? 0L : task.getDuration(), + hostAndPort, + task.getLocation().getHost(), + (long) task.getLocation().getPort(), + (long) task.getLocation().getTlsPort(), + task.getErrorMsg() + }; } @Override @@ -632,7 +653,10 @@ public class SystemSchema extends AbstractSchema return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler)); } - private CloseableIterator getAuthorizedTasks(JsonParserIterator it, DataContext root) + private CloseableIterator getAuthorizedTasks( + JsonParserIterator it, + DataContext root + ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); @@ -723,4 +747,32 @@ public class SystemSchema extends AbstractSchema }; } + @Nullable + private static String extractHost(@Nullable final String hostAndPort) + { + if (hostAndPort == null) { + return null; + } + + return HostAndPort.fromString(hostAndPort).getHostText(); + } + + private static int extractPort(@Nullable final String hostAndPort) + { + if (hostAndPort == null) { + return -1; + } + + return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1); + } + + @Nullable + private static String toStringOrNull(@Nullable final Object object) + { + if (object == null) { + return null; + } + + return object.toString(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 3537ad62bef..851adff9454 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -26,8 +26,6 @@ import com.google.common.util.concurrent.SettableFuture; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -39,8 +37,10 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; @@ -54,6 +54,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -64,6 +65,7 @@ import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; @@ -147,10 +149,10 @@ public class SystemSchemaTest extends CalciteTestBase responseHandler = EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class) .withConstructor() .addMockedMethod( - "handleResponse", - HttpResponse.class, - HttpResponseHandler.TrafficCop.class - ) + "handleResponse", + HttpResponse.class, + HttpResponseHandler.TrafficCop.class + ) .addMockedMethod("getStatus") .createMock(); request = EasyMock.createMock(Request.class); @@ -441,36 +443,34 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = segmentsTable.scan(dataContext); - Enumerator enumerator = rows.enumerator(); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row1 = enumerator.current(); + final List rows = segmentsTable.scan(dataContext).toList(); + + Assert.assertEquals(6, rows.size()); + + Object[] row0 = rows.get(0); //segment 6 is published and unavailable, num_replicas is 0 - Assert.assertEquals(1L, row1[9]); - Assert.assertEquals(0L, row1[7]); - Assert.assertEquals(0L, row1[8]); //numRows = 0 + Assert.assertEquals(1L, row0[9]); + Assert.assertEquals(0L, row0[7]); + Assert.assertEquals(0L, row0[8]); //numRows = 0 - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(true, enumerator.moveNext()); - - Object[] row5 = enumerator.current(); + Object[] row4 = rows.get(4); //segment 2 is published and has 2 replicas - Assert.assertEquals(1L, row5[9]); - Assert.assertEquals(2L, row5[7]); - Assert.assertEquals(3L, row5[8]); //numRows = 3 - Assert.assertEquals(true, enumerator.moveNext()); - Assert.assertEquals(false, enumerator.moveNext()); + Assert.assertEquals(1L, row4[9]); + Assert.assertEquals(2L, row4[7]); + Assert.assertEquals(3L, row4[8]); //numRows = 3 + // Verify value types. + verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE); } @Test public void testServersTable() { - SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class).withConstructor(serverView, authMapper).createMock(); + SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) + .withConstructor(serverView, authMapper) + .createMock(); EasyMock.replay(serversTable); EasyMock.expect(serverView.getDruidServers()) @@ -503,14 +503,17 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = serversTable.scan(dataContext); - Assert.assertEquals(2, rows.count()); - Object[] row1 = rows.first(); + final List rows = serversTable.scan(dataContext).toList(); + Assert.assertEquals(2, rows.size()); + Object[] row1 = rows.get(0); Assert.assertEquals("localhost:0000", row1[0]); Assert.assertEquals("realtime", row1[4].toString()); - Object[] row2 = rows.last(); + Object[] row2 = rows.get(1); Assert.assertEquals("server2:1234", row2[0]); Assert.assertEquals("historical", row2[4].toString()); + + // Verify value types. + verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE); } @Test @@ -550,8 +553,6 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = serverSegmentsTable.scan(dataContext); - Assert.assertEquals(5, rows.count()); //server_segments table is the join of servers and segments table // it will have 5 rows as follows @@ -561,34 +562,31 @@ public class SystemSchemaTest extends CalciteTestBase // server2:1234 | test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4) // server2:1234 | test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5) - Enumerator enumerator = rows.enumerator(); - Assert.assertEquals(true, enumerator.moveNext()); + final List rows = serverSegmentsTable.scan(dataContext).toList(); + Assert.assertEquals(5, rows.size()); - Object[] row1 = rows.first(); + Object[] row0 = rows.get(0); + Assert.assertEquals("localhost:0000", row0[0]); + Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row0[1]); + + Object[] row1 = rows.get(1); Assert.assertEquals("localhost:0000", row1[0]); - Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row1[1]); + Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row1[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row2 = enumerator.current(); - Assert.assertEquals("localhost:0000", row2[0]); - Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row2[1]); + Object[] row2 = rows.get(2); + Assert.assertEquals("server2:1234", row2[0]); + Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row3 = enumerator.current(); + Object[] row3 = rows.get(3); Assert.assertEquals("server2:1234", row3[0]); - Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row3[1]); + Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row4 = enumerator.current(); + Object[] row4 = rows.get(4); Assert.assertEquals("server2:1234", row4[0]); - Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row4[1]); + Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row5 = rows.last(); - Assert.assertEquals("server2:1234", row5[0]); - Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row5[1]); - - Assert.assertEquals(false, enumerator.moveNext()); + // Verify value types. + verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE); } @Test @@ -672,26 +670,78 @@ public class SystemSchemaTest extends CalciteTestBase return CalciteTests.SUPER_USER_AUTH_RESULT; } }; - Enumerable rows = tasksTable.scan(dataContext); - Enumerator enumerator = rows.enumerator(); + final List rows = tasksTable.scan(dataContext).toList(); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row1 = enumerator.current(); - Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row1[0].toString()); - Assert.assertEquals("FAILED", row1[5].toString()); - Assert.assertEquals("NONE", row1[6].toString()); - Assert.assertEquals(-1L, row1[7]); - Assert.assertEquals("testHost:1234", row1[8]); + Object[] row0 = rows.get(0); + Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString()); + Assert.assertEquals("FAILED", row0[5].toString()); + Assert.assertEquals("NONE", row0[6].toString()); + Assert.assertEquals(-1L, row0[7]); + Assert.assertEquals("testHost:1234", row0[8]); - Assert.assertEquals(true, enumerator.moveNext()); - Object[] row2 = enumerator.current(); - Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row2[0].toString()); - Assert.assertEquals("RUNNING", row2[5].toString()); - Assert.assertEquals("RUNNING", row2[6].toString()); - Assert.assertEquals(null, row2[7]); - Assert.assertEquals("192.168.1.6:8100", row2[8]); + Object[] row1 = rows.get(1); + Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); + Assert.assertEquals("RUNNING", row1[5].toString()); + Assert.assertEquals("RUNNING", row1[6].toString()); + Assert.assertEquals(0L, row1[7]); + Assert.assertEquals("192.168.1.6:8100", row1[8]); - Assert.assertEquals(false, enumerator.moveNext()); + // Verify value types. + verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); } + private static void verifyTypes(final List rows, final RowSignature signature) + { + final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl()); + + for (Object[] row : rows) { + Assert.assertEquals(row.length, signature.getRowOrder().size()); + + for (int i = 0; i < row.length; i++) { + final Class expectedClass; + + final ValueType columnType = signature.getColumnType(signature.getRowOrder().get(i)); + final boolean nullable = rowType.getFieldList().get(i).getType().isNullable(); + + switch (columnType) { + case LONG: + expectedClass = Long.class; + break; + case FLOAT: + expectedClass = Float.class; + break; + case DOUBLE: + expectedClass = Double.class; + break; + case STRING: + expectedClass = String.class; + break; + default: + throw new IAE("Don't know what class to expect for valueType[%s]", columnType); + } + + if (nullable) { + Assert.assertTrue( + StringUtils.format( + "Column[%s] is a [%s] or null (was %s)", + signature.getRowOrder().get(i), + expectedClass.getName(), + row[i] == null ? null : row[i].getClass().getName() + ), + row[i] == null || expectedClass.isAssignableFrom(row[i].getClass()) + ); + } else { + Assert.assertTrue( + StringUtils.format( + "Column[%s] is a [%s] (was %s)", + signature.getRowOrder().get(i), + expectedClass.getName(), + row[i] == null ? null : row[i].getClass().getName() + ), + row[i] != null && expectedClass.isAssignableFrom(row[i].getClass()) + ); + } + } + } + } }