SQL: Add ProjectableFilterableTable to SegmentsTable. (#16841)

* SQL: Add ProjectableFilterableTable to SegmentsTable.

This allows us to skip serialization of expensive fields such as
shard_spec, dimensions, metrics, and last_compaction_state, if those
fields are not actually being queried.

* Restructure logic to avoid unnecessary toString() as well.
This commit is contained in:
Gian Merlino 2024-08-06 06:40:21 -07:00 committed by GitHub
parent c3aa033e14
commit de40d81b29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 201 additions and 71 deletions

View File

@ -47,6 +47,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlNode;
@ -285,7 +286,8 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
{
if (node instanceof TableScan) {
RelOptTable table = node.getTable();
if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
if ((table.unwrap(ScannableTable.class) != null || table.unwrap(ProjectableFilterableTable.class) != null)
&& table.unwrap(DruidTable.class) == null) {
found.add(table);
return;
}

View File

@ -32,6 +32,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.DefaultEnumerable;
import org.apache.calcite.linq4j.Enumerable;
@ -39,6 +41,8 @@ import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
@ -68,6 +72,7 @@ import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHo
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.Access;
@ -99,6 +104,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SystemSchema extends AbstractSchema
{
@ -157,6 +163,23 @@ public class SystemSchema extends AbstractSchema
.add("replication_factor", ColumnType.LONG)
.build();
/**
* List of [0..n) where n is the size of {@link #SEGMENTS_SIGNATURE}.
*/
private static final int[] SEGMENTS_PROJECT_ALL = IntStream.range(0, SEGMENTS_SIGNATURE.size()).toArray();
/**
* Fields in {@link #SEGMENTS_SIGNATURE} that are serialized with {@link ObjectMapper#writeValueAsString(Object)}.
*/
private static final IntSet SEGMENTS_JSON_FIELDS = new IntOpenHashSet(
new int[]{
SEGMENTS_SIGNATURE.indexOf("shard_spec"),
SEGMENTS_SIGNATURE.indexOf("dimensions"),
SEGMENTS_SIGNATURE.indexOf("metrics"),
SEGMENTS_SIGNATURE.indexOf("last_compaction_state")
}
);
static final RowSignature SERVERS_SIGNATURE = RowSignature
.builder()
.add("server", ColumnType.STRING)
@ -241,7 +264,7 @@ public class SystemSchema extends AbstractSchema
/**
* This table contains row per segment from metadata store as well as served segments.
*/
static class SegmentsTable extends AbstractTable implements ScannableTable
static class SegmentsTable extends AbstractTable implements ProjectableFilterableTable
{
private final DruidSchema druidSchema;
private final ObjectMapper jsonMapper;
@ -274,7 +297,11 @@ public class SystemSchema extends AbstractSchema
}
@Override
public Enumerable<Object[]> scan(DataContext root)
public Enumerable<Object[]> scan(
final DataContext root,
final List<RexNode> filters,
@Nullable final int[] projects
)
{
// get available segments from druidSchema
final Map<SegmentId, AvailableSegmentMetadata> availableSegmentMetadata =
@ -327,35 +354,30 @@ public class SystemSchema extends AbstractSchema
// is_active is true for published segments that are not overshadowed or else they should be realtime
boolean isActive = isPublished ? !val.isOvershadowed() : val.isRealtime();
try {
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
numRows,
isActive ? IS_ACTIVE_TRUE : IS_ACTIVE_FALSE,
isPublished ? IS_PUBLISHED_TRUE : IS_PUBLISHED_FALSE,
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()),
segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()),
segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()),
segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()),
// If the segment is unpublished, we won't have this information yet.
// If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor.
// This should be automatically updated in the next Coordinator poll.
val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor()
};
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getSize(),
segment.getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
numRows,
isActive ? IS_ACTIVE_TRUE : IS_ACTIVE_FALSE,
isPublished ? IS_PUBLISHED_TRUE : IS_PUBLISHED_FALSE,
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics(),
segment.getLastCompactionState(),
// If the segment is unpublished, we won't have this information yet.
// If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor.
// This should be automatically updated in the next Coordinator poll.
val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor()
};
});
// If druid.centralizedDatasourceSchema.enabled is set on the Coordinator, all the segments in this loop
@ -369,45 +391,43 @@ public class SystemSchema extends AbstractSchema
if (segmentsAlreadySeen.contains(val.getKey())) {
return null;
}
final DataSegment segment = val.getValue().getSegment();
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
try {
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getValue().getSegment().getSize(),
val.getKey().getVersion(),
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
// is_active is true for unpublished segments iff they are realtime
val.getValue().isRealtime() /* is_active */,
// is_published is false for unpublished segments
IS_PUBLISHED_FALSE,
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE,
// there is an assumption here that unpublished segments are never overshadowed
val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
null, // unpublished segments from realtime tasks will not be compacted yet
REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet.
};
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart(),
val.getKey().getInterval().getEnd(),
segment.getSize(),
val.getKey().getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
// is_active is true for unpublished segments iff they are realtime
val.getValue().isRealtime() /* is_active */,
// is_published is false for unpublished segments
IS_PUBLISHED_FALSE,
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE,
// there is an assumption here that unpublished segments are never overshadowed
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics(),
null, // unpublished segments from realtime tasks will not be compacted yet
REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet.
};
});
final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
Iterables.concat(publishedSegments, availableSegments)
);
return Linq4j.asEnumerable(allSegments).where(Objects::nonNull);
return Linq4j.asEnumerable(allSegments)
.where(Objects::nonNull)
.select(row -> projectSegmentsRow(row, projects, jsonMapper));
}
private Iterator<SegmentStatusInCluster> getAuthorizedPublishedSegments(
@ -638,7 +658,10 @@ public class SystemSchema extends AbstractSchema
/**
* Returns a row for all node types which don't serve data. The returned row contains only static information.
*/
private static Object[] buildRowForNonDataServerWithLeadership(DiscoveryDruidNode discoveryDruidNode, boolean isLeader)
private static Object[] buildRowForNonDataServerWithLeadership(
DiscoveryDruidNode discoveryDruidNode,
boolean isLeader
)
{
final DruidNode node = discoveryDruidNode.getDruidNode();
return new Object[]{
@ -775,7 +798,7 @@ public class SystemSchema extends AbstractSchema
for (DataSegment segment : authorizedServerSegments) {
Object[] row = new Object[serverSegmentsTableSize];
row[0] = druidServer.getHost();
row[1] = segment.getId();
row[1] = segment.getId().toString();
rows.add(row);
}
}
@ -1138,4 +1161,44 @@ public class SystemSchema extends AbstractSchema
throw new ForbiddenException("Insufficient permission to view servers: " + stateAccess.toMessage());
}
}
/**
* Project a row using "projects" from {@link SegmentsTable#scan(DataContext, List, int[])}.
*
* Also, fix up types so {@link ColumnType#STRING} are transformed to Strings if they aren't yet. This defers
* computation of {@link ObjectMapper#writeValueAsString(Object)} or {@link Object#toString()} until we know we
* actually need it.
*/
private static Object[] projectSegmentsRow(
final Object[] row,
@Nullable final int[] projects,
final ObjectMapper jsonMapper
)
{
final int[] nonNullProjects = projects == null ? SEGMENTS_PROJECT_ALL : projects;
final Object[] projectedRow = new Object[nonNullProjects.length];
for (int i = 0; i < nonNullProjects.length; i++) {
final Object o = row[nonNullProjects[i]];
if (SEGMENTS_SIGNATURE.getColumnType(nonNullProjects[i]).get().is(ValueType.STRING)
&& o != null
&& !(o instanceof String)) {
// Delay calling toString() or ObjectMapper#writeValueAsString() until we know we actually need this field.
if (SEGMENTS_JSON_FIELDS.contains(nonNullProjects[i])) {
try {
projectedRow[i] = jsonMapper.writeValueAsString(o);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} else {
projectedRow[i] = o.toString();
}
} else {
projectedRow[i] = o;
}
}
return projectedRow;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@ -579,7 +580,7 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.replay(client, request, responseHolder, responseHandler, metadataView);
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
final List<Object[]> rows = segmentsTable.scan(dataContext, Collections.emptyList(), null).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
// total segments = 8
@ -717,6 +718,74 @@ public class SystemSchemaTest extends CalciteTestBase
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
}
@Test
public void testSegmentsTableWithProjection() throws JsonProcessingException
{
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper);
final Set<SegmentStatusInCluster> publishedSegments = new HashSet<>(Arrays.asList(
new SegmentStatusInCluster(publishedCompactedSegment1, true, 2, null, false),
new SegmentStatusInCluster(publishedCompactedSegment2, false, 0, null, false),
new SegmentStatusInCluster(publishedUncompactedSegment3, false, 2, null, false),
new SegmentStatusInCluster(segment1, true, 2, null, false),
new SegmentStatusInCluster(segment2, false, 0, null, false)
));
EasyMock.expect(metadataView.getSegments()).andReturn(publishedSegments.iterator()).once();
EasyMock.replay(client, request, responseHolder, responseHandler, metadataView);
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = segmentsTable.scan(
dataContext,
Collections.emptyList(),
new int[]{
SystemSchema.SEGMENTS_SIGNATURE.indexOf("last_compaction_state"),
SystemSchema.SEGMENTS_SIGNATURE.indexOf("segment_id")
}
).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[1]).compareTo(row2[1]));
// total segments = 8
// segments test1, test2 are published and available
// segment test3 is served by historical but unpublished or unused
// segments test4, test5 are not published but available (realtime segments)
// segment test2 is both published and served by a realtime server.
Assert.assertEquals(8, rows.size());
Assert.assertNull(null, rows.get(0)[0]);
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", rows.get(0)[1]);
Assert.assertNull(null, rows.get(1)[0]);
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", rows.get(1)[1]);
Assert.assertNull(null, rows.get(2)[0]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", rows.get(2)[1]);
Assert.assertNull(null, rows.get(3)[0]);
Assert.assertEquals("test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", rows.get(3)[1]);
Assert.assertNull(null, rows.get(4)[0]);
Assert.assertEquals("test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", rows.get(4)[1]);
Assert.assertEquals(mapper.writeValueAsString(expectedCompactionState), rows.get(5)[0]);
Assert.assertEquals("wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", rows.get(5)[1]);
Assert.assertEquals(mapper.writeValueAsString(expectedCompactionState), rows.get(6)[0]);
Assert.assertEquals("wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2", rows.get(6)[1]);
Assert.assertNull(null, rows.get(7)[0]);
Assert.assertEquals("wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3", rows.get(7)[1]);
// Verify value types.
verifyTypes(
rows,
RowSignature.builder()
.add("last_compaction_state", ColumnType.STRING)
.add("segment_id", ColumnType.STRING)
.build()
);
}
private void verifyRow(
Object[] row,
String segmentId,
@ -1519,11 +1588,7 @@ public class SystemSchemaTest extends CalciteTestBase
expectedClass = Double.class;
break;
case STRING:
if (signature.getColumnName(i).equals("segment_id")) {
expectedClass = SegmentId.class;
} else {
expectedClass = String.class;
}
expectedClass = String.class;
break;
default:
throw new IAE("Don't know what class to expect for valueType[%s]", columnType);