SystemSchema: Fix data types for various fields. (#6642)

* SystemSchema: Fix data types for various fields.

- segments: start, end, partition_num
- servers: plaintext_port, tls_port
- tasks: plaintext_port, tls_port

The declared and actual types did not match, but they must or
else queries may generate ClassCastExceptions.

Also adjusted some of the code for generating values to be more
robust in the face of nulls or malformed strings.

* Fix style.
This commit is contained in:
Gian Merlino 2018-11-18 17:24:19 -08:00 committed by Fangjin Yang
parent 7cd457f41c
commit e9c3d3e651
2 changed files with 219 additions and 117 deletions

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.calcite.DataContext;
@ -50,7 +51,6 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -88,9 +89,7 @@ public class SystemSchema extends AbstractSchema
private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks";
private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("segment_id", ValueType.STRING)
.add("datasource", ValueType.STRING)
@ -98,7 +97,7 @@ public class SystemSchema extends AbstractSchema
.add("end", ValueType.STRING)
.add("size", ValueType.LONG)
.add("version", ValueType.STRING)
.add("partition_num", ValueType.STRING)
.add("partition_num", ValueType.LONG)
.add("num_replicas", ValueType.LONG)
.add("num_rows", ValueType.LONG)
.add("is_published", ValueType.LONG)
@ -107,25 +106,25 @@ public class SystemSchema extends AbstractSchema
.add("payload", ValueType.STRING)
.build();
private static final RowSignature SERVERS_SIGNATURE = RowSignature
static final RowSignature SERVERS_SIGNATURE = RowSignature
.builder()
.add("server", ValueType.STRING)
.add("host", ValueType.STRING)
.add("plaintext_port", ValueType.STRING)
.add("tls_port", ValueType.STRING)
.add("plaintext_port", ValueType.LONG)
.add("tls_port", ValueType.LONG)
.add("server_type", ValueType.STRING)
.add("tier", ValueType.STRING)
.add("curr_size", ValueType.LONG)
.add("max_size", ValueType.LONG)
.build();
private static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("server", ValueType.STRING)
.add("segment_id", ValueType.STRING)
.build();
private static final RowSignature TASKS_SIGNATURE = RowSignature
static final RowSignature TASKS_SIGNATURE = RowSignature
.builder()
.add("task_id", ValueType.STRING)
.add("type", ValueType.STRING)
@ -137,8 +136,8 @@ public class SystemSchema extends AbstractSchema
.add("duration", ValueType.LONG)
.add("location", ValueType.STRING)
.add("host", ValueType.STRING)
.add("plaintext_port", ValueType.STRING)
.add("tls_port", ValueType.STRING)
.add("plaintext_port", ValueType.LONG)
.add("tls_port", ValueType.LONG)
.add("error_msg", ValueType.STRING)
.build();
@ -156,12 +155,24 @@ public class SystemSchema extends AbstractSchema
{
Preconditions.checkNotNull(serverView, "serverView");
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper),
SERVERS_TABLE, new ServersTable(serverView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
);
this.tableMap = ImmutableMap.<String, Table>builder()
.put(
SEGMENTS_TABLE,
new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
)
.put(
SERVERS_TABLE,
new ServersTable(serverView, authorizerMapper)
)
.put(
SERVER_SEGMENTS_TABLE,
new ServerSegmentsTable(serverView, authorizerMapper)
)
.put(
TASKS_TABLE,
new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
)
.build();
}
@Override
@ -248,11 +259,11 @@ public class SystemSchema extends AbstractSchema
return new Object[]{
val.getIdentifier(),
val.getDataSource(),
val.getInterval().getStart(),
val.getInterval().getEnd(),
val.getInterval().getStart().toString(),
val.getInterval().getEnd().toString(),
val.getSize(),
val.getVersion(),
val.getShardSpec().getPartitionNum(),
Long.valueOf(val.getShardSpec().getPartitionNum()),
numReplicas,
numRows,
1L, //is_published is true for published segments
@ -276,16 +287,17 @@ public class SystemSchema extends AbstractSchema
if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) {
return null;
}
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getIdentifier());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{
val.getKey().getIdentifier(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart(),
val.getKey().getInterval().getEnd(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getKey().getSize(),
val.getKey().getVersion(),
val.getKey().getShardSpec().getPartitionNum(),
partialSegmentDataMap.get(val.getKey().getIdentifier()) == null ? 0L
: partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(),
Long.valueOf(val.getKey().getShardSpec().getPartitionNum()),
numReplicas,
val.getValue().getNumRows(),
val.getValue().isPublished(),
val.getValue().isAvailable(),
@ -471,10 +483,10 @@ public class SystemSchema extends AbstractSchema
.from(druidServers)
.transform(val -> new Object[]{
val.getHost(),
val.getHost().split(":")[0],
val.getHostAndPort() == null ? -1 : val.getHostAndPort().split(":")[1],
val.getHostAndTlsPort() == null ? -1 : val.getHostAndTlsPort().split(":")[1],
val.getType(),
extractHost(val.getHost()),
(long) extractPort(val.getHostAndPort()),
(long) extractPort(val.getHostAndTlsPort()),
toStringOrNull(val.getType()),
val.getTier(),
val.getCurrSize(),
val.getMaxSize()
@ -583,24 +595,33 @@ public class SystemSchema extends AbstractSchema
@Override
public Object[] current()
{
TaskStatusPlus task = it.next();
return new Object[]{task.getId(),
task.getType(),
task.getDataSource(),
task.getCreatedTime(),
task.getQueueInsertionTime(),
task.getStatusCode(),
task.getRunnerStatusCode(),
task.getDuration(),
task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort()
== -1
? task.getLocation()
.getPort()
: task.getLocation().getTlsPort()),
task.getLocation().getHost(),
task.getLocation().getPort(),
task.getLocation().getTlsPort(),
task.getErrorMsg()};
final TaskStatusPlus task = it.next();
final String hostAndPort;
if (task.getLocation().getHost() == null) {
hostAndPort = null;
} else {
final int port = task.getLocation().getTlsPort() >= 0
? task.getLocation().getTlsPort()
: task.getLocation().getPort();
hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString();
}
return new Object[]{
task.getId(),
task.getType(),
task.getDataSource(),
toStringOrNull(task.getCreatedTime()),
toStringOrNull(task.getQueueInsertionTime()),
toStringOrNull(task.getStatusCode()),
toStringOrNull(task.getRunnerStatusCode()),
task.getDuration() == null ? 0L : task.getDuration(),
hostAndPort,
task.getLocation().getHost(),
(long) task.getLocation().getPort(),
(long) task.getLocation().getTlsPort(),
task.getErrorMsg()
};
}
@Override
@ -632,7 +653,10 @@ public class SystemSchema extends AbstractSchema
return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler));
}
private CloseableIterator<TaskStatusPlus> getAuthorizedTasks(JsonParserIterator<TaskStatusPlus> it, DataContext root)
private CloseableIterator<TaskStatusPlus> getAuthorizedTasks(
JsonParserIterator<TaskStatusPlus> it,
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
@ -723,4 +747,32 @@ public class SystemSchema extends AbstractSchema
};
}
@Nullable
private static String extractHost(@Nullable final String hostAndPort)
{
if (hostAndPort == null) {
return null;
}
return HostAndPort.fromString(hostAndPort).getHostText();
}
private static int extractPort(@Nullable final String hostAndPort)
{
if (hostAndPort == null) {
return -1;
}
return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1);
}
@Nullable
private static String toStringOrNull(@Nullable final Object object)
{
if (object == null) {
return null;
}
return object.toString();
}
}

View File

@ -26,8 +26,6 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
@ -39,8 +37,10 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
@ -54,6 +54,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
@ -64,6 +65,7 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -147,10 +149,10 @@ public class SystemSchemaTest extends CalciteTestBase
responseHandler = EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class)
.withConstructor()
.addMockedMethod(
"handleResponse",
HttpResponse.class,
HttpResponseHandler.TrafficCop.class
)
"handleResponse",
HttpResponse.class,
HttpResponseHandler.TrafficCop.class
)
.addMockedMethod("getStatus")
.createMock();
request = EasyMock.createMock(Request.class);
@ -441,36 +443,34 @@ public class SystemSchemaTest extends CalciteTestBase
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
Enumerable<Object[]> rows = segmentsTable.scan(dataContext);
Enumerator<Object[]> enumerator = rows.enumerator();
Assert.assertEquals(true, enumerator.moveNext());
Object[] row1 = enumerator.current();
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
Assert.assertEquals(6, rows.size());
Object[] row0 = rows.get(0);
//segment 6 is published and unavailable, num_replicas is 0
Assert.assertEquals(1L, row1[9]);
Assert.assertEquals(0L, row1[7]);
Assert.assertEquals(0L, row1[8]); //numRows = 0
Assert.assertEquals(1L, row0[9]);
Assert.assertEquals(0L, row0[7]);
Assert.assertEquals(0L, row0[8]); //numRows = 0
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
Object[] row5 = enumerator.current();
Object[] row4 = rows.get(4);
//segment 2 is published and has 2 replicas
Assert.assertEquals(1L, row5[9]);
Assert.assertEquals(2L, row5[7]);
Assert.assertEquals(3L, row5[8]); //numRows = 3
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(false, enumerator.moveNext());
Assert.assertEquals(1L, row4[9]);
Assert.assertEquals(2L, row4[7]);
Assert.assertEquals(3L, row4[8]); //numRows = 3
// Verify value types.
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
}
@Test
public void testServersTable()
{
SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class).withConstructor(serverView, authMapper).createMock();
SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class)
.withConstructor(serverView, authMapper)
.createMock();
EasyMock.replay(serversTable);
EasyMock.expect(serverView.getDruidServers())
@ -503,14 +503,17 @@ public class SystemSchemaTest extends CalciteTestBase
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
Enumerable<Object[]> rows = serversTable.scan(dataContext);
Assert.assertEquals(2, rows.count());
Object[] row1 = rows.first();
final List<Object[]> rows = serversTable.scan(dataContext).toList();
Assert.assertEquals(2, rows.size());
Object[] row1 = rows.get(0);
Assert.assertEquals("localhost:0000", row1[0]);
Assert.assertEquals("realtime", row1[4].toString());
Object[] row2 = rows.last();
Object[] row2 = rows.get(1);
Assert.assertEquals("server2:1234", row2[0]);
Assert.assertEquals("historical", row2[4].toString());
// Verify value types.
verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE);
}
@Test
@ -550,8 +553,6 @@ public class SystemSchemaTest extends CalciteTestBase
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
Enumerable<Object[]> rows = serverSegmentsTable.scan(dataContext);
Assert.assertEquals(5, rows.count());
//server_segments table is the join of servers and segments table
// it will have 5 rows as follows
@ -561,34 +562,31 @@ public class SystemSchemaTest extends CalciteTestBase
// server2:1234 | test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4)
// server2:1234 | test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5)
Enumerator<Object[]> enumerator = rows.enumerator();
Assert.assertEquals(true, enumerator.moveNext());
final List<Object[]> rows = serverSegmentsTable.scan(dataContext).toList();
Assert.assertEquals(5, rows.size());
Object[] row1 = rows.first();
Object[] row0 = rows.get(0);
Assert.assertEquals("localhost:0000", row0[0]);
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row0[1]);
Object[] row1 = rows.get(1);
Assert.assertEquals("localhost:0000", row1[0]);
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row1[1]);
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row1[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row2 = enumerator.current();
Assert.assertEquals("localhost:0000", row2[0]);
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row2[1]);
Object[] row2 = rows.get(2);
Assert.assertEquals("server2:1234", row2[0]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row3 = enumerator.current();
Object[] row3 = rows.get(3);
Assert.assertEquals("server2:1234", row3[0]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row3[1]);
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row4 = enumerator.current();
Object[] row4 = rows.get(4);
Assert.assertEquals("server2:1234", row4[0]);
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row4[1]);
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row5 = rows.last();
Assert.assertEquals("server2:1234", row5[0]);
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row5[1]);
Assert.assertEquals(false, enumerator.moveNext());
// Verify value types.
verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE);
}
@Test
@ -672,26 +670,78 @@ public class SystemSchemaTest extends CalciteTestBase
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
Enumerable<Object[]> rows = tasksTable.scan(dataContext);
Enumerator<Object[]> enumerator = rows.enumerator();
final List<Object[]> rows = tasksTable.scan(dataContext).toList();
Assert.assertEquals(true, enumerator.moveNext());
Object[] row1 = enumerator.current();
Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row1[0].toString());
Assert.assertEquals("FAILED", row1[5].toString());
Assert.assertEquals("NONE", row1[6].toString());
Assert.assertEquals(-1L, row1[7]);
Assert.assertEquals("testHost:1234", row1[8]);
Object[] row0 = rows.get(0);
Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString());
Assert.assertEquals("FAILED", row0[5].toString());
Assert.assertEquals("NONE", row0[6].toString());
Assert.assertEquals(-1L, row0[7]);
Assert.assertEquals("testHost:1234", row0[8]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row2 = enumerator.current();
Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row2[0].toString());
Assert.assertEquals("RUNNING", row2[5].toString());
Assert.assertEquals("RUNNING", row2[6].toString());
Assert.assertEquals(null, row2[7]);
Assert.assertEquals("192.168.1.6:8100", row2[8]);
Object[] row1 = rows.get(1);
Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString());
Assert.assertEquals("RUNNING", row1[5].toString());
Assert.assertEquals("RUNNING", row1[6].toString());
Assert.assertEquals(0L, row1[7]);
Assert.assertEquals("192.168.1.6:8100", row1[8]);
Assert.assertEquals(false, enumerator.moveNext());
// Verify value types.
verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);
}
private static void verifyTypes(final List<Object[]> rows, final RowSignature signature)
{
final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl());
for (Object[] row : rows) {
Assert.assertEquals(row.length, signature.getRowOrder().size());
for (int i = 0; i < row.length; i++) {
final Class<?> expectedClass;
final ValueType columnType = signature.getColumnType(signature.getRowOrder().get(i));
final boolean nullable = rowType.getFieldList().get(i).getType().isNullable();
switch (columnType) {
case LONG:
expectedClass = Long.class;
break;
case FLOAT:
expectedClass = Float.class;
break;
case DOUBLE:
expectedClass = Double.class;
break;
case STRING:
expectedClass = String.class;
break;
default:
throw new IAE("Don't know what class to expect for valueType[%s]", columnType);
}
if (nullable) {
Assert.assertTrue(
StringUtils.format(
"Column[%s] is a [%s] or null (was %s)",
signature.getRowOrder().get(i),
expectedClass.getName(),
row[i] == null ? null : row[i].getClass().getName()
),
row[i] == null || expectedClass.isAssignableFrom(row[i].getClass())
);
} else {
Assert.assertTrue(
StringUtils.format(
"Column[%s] is a [%s] (was %s)",
signature.getRowOrder().get(i),
expectedClass.getName(),
row[i] == null ? null : row[i].getClass().getName()
),
row[i] != null && expectedClass.isAssignableFrom(row[i].getClass())
);
}
}
}
}
}