diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index b543be135cc..82dd6afe8c9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -47,6 +47,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlNode; @@ -285,7 +286,8 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand { if (node instanceof TableScan) { RelOptTable table = node.getTable(); - if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) { + if ((table.unwrap(ScannableTable.class) != null || table.unwrap(ProjectableFilterableTable.class) != null) + && table.unwrap(DruidTable.class) == null) { found.add(table); return; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 1706620f79c..695b34b2772 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -32,6 +32,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.DefaultEnumerable; import org.apache.calcite.linq4j.Enumerable; @@ -39,6 +41,8 @@ import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ProjectableFilterableTable; import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; @@ -68,6 +72,7 @@ import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHo import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; import org.apache.druid.server.DruidNode; import org.apache.druid.server.security.Access; @@ -99,6 +104,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; public class SystemSchema extends AbstractSchema { @@ -157,6 +163,23 @@ public class SystemSchema extends AbstractSchema .add("replication_factor", ColumnType.LONG) .build(); + /** + * List of [0..n) where n is the size of {@link #SEGMENTS_SIGNATURE}. + */ + private static final int[] SEGMENTS_PROJECT_ALL = IntStream.range(0, SEGMENTS_SIGNATURE.size()).toArray(); + + /** + * Fields in {@link #SEGMENTS_SIGNATURE} that are serialized with {@link ObjectMapper#writeValueAsString(Object)}. + */ + private static final IntSet SEGMENTS_JSON_FIELDS = new IntOpenHashSet( + new int[]{ + SEGMENTS_SIGNATURE.indexOf("shard_spec"), + SEGMENTS_SIGNATURE.indexOf("dimensions"), + SEGMENTS_SIGNATURE.indexOf("metrics"), + SEGMENTS_SIGNATURE.indexOf("last_compaction_state") + } + ); + static final RowSignature SERVERS_SIGNATURE = RowSignature .builder() .add("server", ColumnType.STRING) @@ -241,7 +264,7 @@ public class SystemSchema extends AbstractSchema /** * This table contains row per segment from metadata store as well as served segments. */ - static class SegmentsTable extends AbstractTable implements ScannableTable + static class SegmentsTable extends AbstractTable implements ProjectableFilterableTable { private final DruidSchema druidSchema; private final ObjectMapper jsonMapper; @@ -274,7 +297,11 @@ public class SystemSchema extends AbstractSchema } @Override - public Enumerable scan(DataContext root) + public Enumerable scan( + final DataContext root, + final List filters, + @Nullable final int[] projects + ) { // get available segments from druidSchema final Map availableSegmentMetadata = @@ -327,35 +354,30 @@ public class SystemSchema extends AbstractSchema // is_active is true for published segments that are not overshadowed or else they should be realtime boolean isActive = isPublished ? !val.isOvershadowed() : val.isRealtime(); - try { - return new Object[]{ - segment.getId(), - segment.getDataSource(), - segment.getInterval().getStart().toString(), - segment.getInterval().getEnd().toString(), - segment.getSize(), - segment.getVersion(), - (long) segment.getShardSpec().getPartitionNum(), - numReplicas, - numRows, - isActive ? IS_ACTIVE_TRUE : IS_ACTIVE_FALSE, - isPublished ? IS_PUBLISHED_TRUE : IS_PUBLISHED_FALSE, - isAvailable, - isRealtime, - val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, - segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()), - segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()), - segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), - segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()), - // If the segment is unpublished, we won't have this information yet. - // If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor. - // This should be automatically updated in the next Coordinator poll. - val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor() - }; - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + return new Object[]{ + segment.getId(), + segment.getDataSource(), + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getSize(), + segment.getVersion(), + (long) segment.getShardSpec().getPartitionNum(), + numReplicas, + numRows, + isActive ? IS_ACTIVE_TRUE : IS_ACTIVE_FALSE, + isPublished ? IS_PUBLISHED_TRUE : IS_PUBLISHED_FALSE, + isAvailable, + isRealtime, + val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, + segment.getShardSpec(), + segment.getDimensions(), + segment.getMetrics(), + segment.getLastCompactionState(), + // If the segment is unpublished, we won't have this information yet. + // If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor. + // This should be automatically updated in the next Coordinator poll. + val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor() + }; }); // If druid.centralizedDatasourceSchema.enabled is set on the Coordinator, all the segments in this loop @@ -369,45 +391,43 @@ public class SystemSchema extends AbstractSchema if (segmentsAlreadySeen.contains(val.getKey())) { return null; } + final DataSegment segment = val.getValue().getSegment(); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey()); final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); - try { - return new Object[]{ - val.getKey(), - val.getKey().getDataSource(), - val.getKey().getInterval().getStart().toString(), - val.getKey().getInterval().getEnd().toString(), - val.getValue().getSegment().getSize(), - val.getKey().getVersion(), - (long) val.getValue().getSegment().getShardSpec().getPartitionNum(), - numReplicas, - val.getValue().getNumRows(), - // is_active is true for unpublished segments iff they are realtime - val.getValue().isRealtime() /* is_active */, - // is_published is false for unpublished segments - IS_PUBLISHED_FALSE, - // is_available is assumed to be always true for segments announced by historicals or realtime tasks - IS_AVAILABLE_TRUE, - val.getValue().isRealtime(), - IS_OVERSHADOWED_FALSE, - // there is an assumption here that unpublished segments are never overshadowed - val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()), - val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), - val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), - null, // unpublished segments from realtime tasks will not be compacted yet - REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet. - }; - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + return new Object[]{ + val.getKey(), + val.getKey().getDataSource(), + val.getKey().getInterval().getStart(), + val.getKey().getInterval().getEnd(), + segment.getSize(), + val.getKey().getVersion(), + (long) segment.getShardSpec().getPartitionNum(), + numReplicas, + val.getValue().getNumRows(), + // is_active is true for unpublished segments iff they are realtime + val.getValue().isRealtime() /* is_active */, + // is_published is false for unpublished segments + IS_PUBLISHED_FALSE, + // is_available is assumed to be always true for segments announced by historicals or realtime tasks + IS_AVAILABLE_TRUE, + val.getValue().isRealtime(), + IS_OVERSHADOWED_FALSE, + // there is an assumption here that unpublished segments are never overshadowed + segment.getShardSpec(), + segment.getDimensions(), + segment.getMetrics(), + null, // unpublished segments from realtime tasks will not be compacted yet + REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet. + }; }); final Iterable allSegments = Iterables.unmodifiableIterable( Iterables.concat(publishedSegments, availableSegments) ); - return Linq4j.asEnumerable(allSegments).where(Objects::nonNull); + return Linq4j.asEnumerable(allSegments) + .where(Objects::nonNull) + .select(row -> projectSegmentsRow(row, projects, jsonMapper)); } private Iterator getAuthorizedPublishedSegments( @@ -638,7 +658,10 @@ public class SystemSchema extends AbstractSchema /** * Returns a row for all node types which don't serve data. The returned row contains only static information. */ - private static Object[] buildRowForNonDataServerWithLeadership(DiscoveryDruidNode discoveryDruidNode, boolean isLeader) + private static Object[] buildRowForNonDataServerWithLeadership( + DiscoveryDruidNode discoveryDruidNode, + boolean isLeader + ) { final DruidNode node = discoveryDruidNode.getDruidNode(); return new Object[]{ @@ -775,7 +798,7 @@ public class SystemSchema extends AbstractSchema for (DataSegment segment : authorizedServerSegments) { Object[] row = new Object[serverSegmentsTableSize]; row[0] = druidServer.getHost(); - row[1] = segment.getId(); + row[1] = segment.getId().toString(); rows.add(row); } } @@ -1138,4 +1161,44 @@ public class SystemSchema extends AbstractSchema throw new ForbiddenException("Insufficient permission to view servers: " + stateAccess.toMessage()); } } + + /** + * Project a row using "projects" from {@link SegmentsTable#scan(DataContext, List, int[])}. + * + * Also, fix up types so {@link ColumnType#STRING} are transformed to Strings if they aren't yet. This defers + * computation of {@link ObjectMapper#writeValueAsString(Object)} or {@link Object#toString()} until we know we + * actually need it. + */ + private static Object[] projectSegmentsRow( + final Object[] row, + @Nullable final int[] projects, + final ObjectMapper jsonMapper + ) + { + final int[] nonNullProjects = projects == null ? SEGMENTS_PROJECT_ALL : projects; + final Object[] projectedRow = new Object[nonNullProjects.length]; + + for (int i = 0; i < nonNullProjects.length; i++) { + final Object o = row[nonNullProjects[i]]; + + if (SEGMENTS_SIGNATURE.getColumnType(nonNullProjects[i]).get().is(ValueType.STRING) + && o != null + && !(o instanceof String)) { + // Delay calling toString() or ObjectMapper#writeValueAsString() until we know we actually need this field. + if (SEGMENTS_JSON_FIELDS.contains(nonNullProjects[i])) { + try { + projectedRow[i] = jsonMapper.writeValueAsString(o); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + projectedRow[i] = o.toString(); + } + } else { + projectedRow[i] = o; + } + } + return projectedRow; + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 992d91dabe8..de5ffbb5f19 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -579,7 +580,7 @@ public class SystemSchemaTest extends CalciteTestBase EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); DataContext dataContext = createDataContext(Users.SUPER); - final List rows = segmentsTable.scan(dataContext).toList(); + final List rows = segmentsTable.scan(dataContext, Collections.emptyList(), null).toList(); rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); // total segments = 8 @@ -717,6 +718,74 @@ public class SystemSchemaTest extends CalciteTestBase verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE); } + @Test + public void testSegmentsTableWithProjection() throws JsonProcessingException + { + final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); + final Set publishedSegments = new HashSet<>(Arrays.asList( + new SegmentStatusInCluster(publishedCompactedSegment1, true, 2, null, false), + new SegmentStatusInCluster(publishedCompactedSegment2, false, 0, null, false), + new SegmentStatusInCluster(publishedUncompactedSegment3, false, 2, null, false), + new SegmentStatusInCluster(segment1, true, 2, null, false), + new SegmentStatusInCluster(segment2, false, 0, null, false) + )); + + EasyMock.expect(metadataView.getSegments()).andReturn(publishedSegments.iterator()).once(); + + EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = segmentsTable.scan( + dataContext, + Collections.emptyList(), + new int[]{ + SystemSchema.SEGMENTS_SIGNATURE.indexOf("last_compaction_state"), + SystemSchema.SEGMENTS_SIGNATURE.indexOf("segment_id") + } + ).toList(); + rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[1]).compareTo(row2[1])); + + // total segments = 8 + // segments test1, test2 are published and available + // segment test3 is served by historical but unpublished or unused + // segments test4, test5 are not published but available (realtime segments) + // segment test2 is both published and served by a realtime server. + + Assert.assertEquals(8, rows.size()); + + Assert.assertNull(null, rows.get(0)[0]); + Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", rows.get(0)[1]); + + Assert.assertNull(null, rows.get(1)[0]); + Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", rows.get(1)[1]); + + Assert.assertNull(null, rows.get(2)[0]); + Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", rows.get(2)[1]); + + Assert.assertNull(null, rows.get(3)[0]); + Assert.assertEquals("test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", rows.get(3)[1]); + + Assert.assertNull(null, rows.get(4)[0]); + Assert.assertEquals("test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", rows.get(4)[1]); + + Assert.assertEquals(mapper.writeValueAsString(expectedCompactionState), rows.get(5)[0]); + Assert.assertEquals("wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", rows.get(5)[1]); + + Assert.assertEquals(mapper.writeValueAsString(expectedCompactionState), rows.get(6)[0]); + Assert.assertEquals("wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2", rows.get(6)[1]); + + Assert.assertNull(null, rows.get(7)[0]); + Assert.assertEquals("wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3", rows.get(7)[1]); + + // Verify value types. + verifyTypes( + rows, + RowSignature.builder() + .add("last_compaction_state", ColumnType.STRING) + .add("segment_id", ColumnType.STRING) + .build() + ); + } + private void verifyRow( Object[] row, String segmentId, @@ -1519,11 +1588,7 @@ public class SystemSchemaTest extends CalciteTestBase expectedClass = Double.class; break; case STRING: - if (signature.getColumnName(i).equals("segment_id")) { - expectedClass = SegmentId.class; - } else { - expectedClass = String.class; - } + expectedClass = String.class; break; default: throw new IAE("Don't know what class to expect for valueType[%s]", columnType);