diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java similarity index 77% rename from sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java rename to sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java index 38ff92858ec..72b69facdc7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java @@ -27,28 +27,25 @@ import java.util.Set; /** * Immutable representation of RowSignature and other segment attributes needed by {@link SystemSchema.SegmentsTable} + * This class contains the metadata of segments announced by historicals or ingestion tasks. */ -public class SegmentMetadataHolder +public class AvailableSegmentMetadata { public static Builder builder( SegmentId segmentId, - long isPublished, - long isAvailable, long isRealtime, Set segmentServers, RowSignature rowSignature, long numRows ) { - return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows); + return new Builder(segmentId, isRealtime, segmentServers, rowSignature, numRows); } - public static Builder from(SegmentMetadataHolder h) + public static Builder from(AvailableSegmentMetadata h) { return new Builder( h.getSegmentId(), - h.isPublished(), - h.isAvailable(), h.isRealtime(), h.getReplicas(), h.getRowSignature(), @@ -58,10 +55,7 @@ public class SegmentMetadataHolder private final SegmentId segmentId; // Booleans represented as long type, where 1 = true and 0 = false - // to make it easy to count number of segments which are - // published, available or realtime etc. - private final long isPublished; - private final long isAvailable; + // to make it easy to count number of segments which are realtime private final long isRealtime; // set of servers that contain the segment private final Set segmentServers; @@ -69,27 +63,15 @@ public class SegmentMetadataHolder @Nullable private final RowSignature rowSignature; - private SegmentMetadataHolder(Builder builder) + private AvailableSegmentMetadata(Builder builder) { this.rowSignature = builder.rowSignature; - this.isPublished = builder.isPublished; - this.isAvailable = builder.isAvailable; this.isRealtime = builder.isRealtime; this.segmentServers = builder.segmentServers; this.numRows = builder.numRows; this.segmentId = builder.segmentId; } - public long isPublished() - { - return isPublished; - } - - public long isAvailable() - { - return isAvailable; - } - public long isRealtime() { return isRealtime; @@ -124,8 +106,6 @@ public class SegmentMetadataHolder public static class Builder { private final SegmentId segmentId; - private final long isPublished; - private final long isAvailable; private final long isRealtime; private Set segmentServers; @@ -135,8 +115,6 @@ public class SegmentMetadataHolder private Builder( SegmentId segmentId, - long isPublished, - long isAvailable, long isRealtime, Set servers, RowSignature rowSignature, @@ -144,8 +122,6 @@ public class SegmentMetadataHolder ) { this.segmentId = segmentId; - this.isPublished = isPublished; - this.isAvailable = isAvailable; this.isRealtime = isRealtime; this.segmentServers = servers; this.rowSignature = rowSignature; @@ -170,9 +146,9 @@ public class SegmentMetadataHolder return this; } - public SegmentMetadataHolder build() + public AvailableSegmentMetadata build() { - return new SegmentMetadataHolder(this); + return new AvailableSegmentMetadata(this); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 4aaa95e12a6..758a5f61f98 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -95,8 +95,6 @@ public class DruidSchema extends AbstractSchema private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long DEFAULT_IS_PUBLISHED = 0; - private static final long DEFAULT_IS_AVAILABLE = 1; private static final long DEFAULT_NUM_ROWS = 0; private final QueryLifecycleFactory queryLifecycleFactory; @@ -111,10 +109,10 @@ public class DruidSchema extends AbstractSchema // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata private final Object lock = new Object(); - // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. + // DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. @GuardedBy("lock") - private final Map> segmentMetadataInfo = new HashMap<>(); + private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; // All mutable segments. @@ -356,24 +354,22 @@ public class DruidSchema extends AbstractSchema void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - SegmentMetadataHolder holder = knownSegments != null ? knownSegments.get(segment) : null; - if (holder == null) { + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment) : null; + if (segmentMetadata == null) { // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; final Set servers = ImmutableSet.of(server.getName()); - holder = SegmentMetadataHolder.builder( + segmentMetadata = AvailableSegmentMetadata.builder( segment.getId(), - DEFAULT_IS_PUBLISHED, - DEFAULT_IS_AVAILABLE, isRealtime, servers, null, DEFAULT_NUM_ROWS ).build(); // Unknown segment. - setSegmentMetadataHolder(segment, holder); + setAvailableSegmentMetadata(segment, segmentMetadata); segmentsNeedingRefresh.add(segment); if (!server.segmentReplicatable()) { log.debug("Added new mutable segment[%s].", segment.getId()); @@ -382,16 +378,16 @@ public class DruidSchema extends AbstractSchema log.debug("Added new immutable segment[%s].", segment.getId()); } } else { - final Set segmentServers = holder.getReplicas(); + final Set segmentServers = segmentMetadata.getReplicas(); final ImmutableSet servers = new ImmutableSet.Builder() .addAll(segmentServers) .add(server.getName()) .build(); - final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder - .from(holder) + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) .withReplicas(servers) .build(); - knownSegments.put(segment, holderWithNumReplicas); + knownSegments.put(segment, metadataWithNumReplicas); if (server.segmentReplicatable()) { // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, // even if it's also available on non-replicatable (realtime) servers. @@ -417,7 +413,7 @@ public class DruidSchema extends AbstractSchema segmentsNeedingRefresh.remove(segment); mutableSegments.remove(segment); - final Map dataSourceSegments = + final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); if (dataSourceSegments.remove(segment) != null) { totalSegments--; @@ -437,17 +433,17 @@ public class DruidSchema extends AbstractSchema { synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - final SegmentMetadataHolder holder = knownSegments.get(segment); - final Set segmentServers = holder.getReplicas(); + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment); + final Set segmentServers = segmentMetadata.getReplicas(); final ImmutableSet servers = FluentIterable.from(segmentServers) .filter(Predicates.not(Predicates.equalTo(server.getName()))) .toSet(); - final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder - .from(holder) + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) .withReplicas(servers) .build(); - knownSegments.put(segment, holderWithNumReplicas); + knownSegments.put(segment, metadataWithNumReplicas); lock.notifyAll(); } } @@ -511,25 +507,25 @@ public class DruidSchema extends AbstractSchema synchronized (lock) { final RowSignature rowSignature = analysisToRowSignature(analysis); log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); - final Map dataSourceSegments = + final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); if (dataSourceSegments == null) { log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); } else { - SegmentMetadataHolder holder = dataSourceSegments.get(segment); - if (holder == null) { + final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segment); + if (segmentMetadata == null) { log.warn( "No segment[%s] found, skipping refresh", segment.getId() ); } else { - SegmentMetadataHolder updatedHolder = SegmentMetadataHolder - .from(holder) + final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) .withRowSignature(rowSignature) .withNumRows(analysis.getNumRows()) .build(); - dataSourceSegments.put(segment, updatedHolder); - setSegmentMetadataHolder(segment, updatedHolder); + dataSourceSegments.put(segment, updatedSegmentMetadata); + setAvailableSegmentMetadata(segment, updatedSegmentMetadata); retVal.add(segment); } } @@ -555,14 +551,14 @@ public class DruidSchema extends AbstractSchema } @VisibleForTesting - void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + void setAvailableSegmentMetadata(final DataSegment segment, final AvailableSegmentMetadata availableSegmentMetadata) { synchronized (lock) { - TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( + TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER) ); - if (dataSourceSegments.put(segment, segmentMetadataHolder) == null) { + if (dataSourceSegments.put(segment, availableSegmentMetadata) == null) { totalSegments++; } } @@ -571,12 +567,12 @@ public class DruidSchema extends AbstractSchema private DruidTable buildDruidTable(final String dataSource) { synchronized (lock) { - final Map segmentMap = segmentMetadataInfo.get(dataSource); + final Map segmentMap = segmentMetadataInfo.get(dataSource); final Map columnTypes = new TreeMap<>(); if (segmentMap != null) { - for (SegmentMetadataHolder segmentMetadataHolder : segmentMap.values()) { - final RowSignature rowSignature = segmentMetadataHolder.getRowSignature(); + for (AvailableSegmentMetadata availableSegmentMetadata : segmentMap.values()) { + final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); if (rowSignature != null) { for (String column : rowSignature.getRowOrder()) { // Newer column types should override older ones. @@ -647,11 +643,11 @@ public class DruidSchema extends AbstractSchema return rowSignatureBuilder.build(); } - Map getSegmentMetadata() + Map getSegmentMetadata() { - final Map segmentMetadata = new HashMap<>(); + final Map segmentMetadata = new HashMap<>(); synchronized (lock) { - for (TreeMap val : segmentMetadataInfo.values()) { + for (TreeMap val : segmentMetadataInfo.values()) { segmentMetadata.putAll(val); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 6316b40f991..29f98161966 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -179,6 +179,9 @@ public class SystemSchema extends AbstractSchema return tableMap; } + /** + * This table contains row per segment from metadata store as well as served segments. + */ static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; @@ -186,6 +189,14 @@ public class SystemSchema extends AbstractSchema private final AuthorizerMapper authorizerMapper; private final MetadataSegmentView metadataView; + /** + * Booleans constants used for available segments represented as long type, + * where 1 = true and 0 = false to make it easy to count number of segments + * which are published, available + */ + private static final long DEFAULT_IS_PUBLISHED = 0; + private static final long DEFAULT_IS_AVAILABLE = 1; + public SegmentsTable( DruidSchema druidSchemna, MetadataSegmentView metadataView, @@ -215,27 +226,27 @@ public class SystemSchema extends AbstractSchema public Enumerable scan(DataContext root) { //get available segments from druidSchema - final Map availableSegmentMetadata = druidSchema.getSegmentMetadata(); - final Iterator> availableSegmentEntries = + final Map availableSegmentMetadata = druidSchema.getSegmentMetadata(); + final Iterator> availableSegmentEntries = availableSegmentMetadata.entrySet().iterator(); // in memory map to store segment data from available segments final Map partialSegmentDataMap = Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); - for (SegmentMetadataHolder h : availableSegmentMetadata.values()) { + for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = - new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows()); + new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); final FluentIterable publishedSegments = FluentIterable .from(() -> getAuthorizedPublishedSegments( - metadataSegments, + metadataStoreSegments, root )) .transform(val -> { @@ -292,8 +303,8 @@ public class SystemSchema extends AbstractSchema Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), numReplicas, val.getValue().getNumRows(), - val.getValue().isPublished(), - val.getValue().isAvailable(), + DEFAULT_IS_PUBLISHED, + DEFAULT_IS_AVAILABLE, val.getValue().isRealtime(), jsonMapper.writeValueAsString(val.getKey()) }; @@ -331,18 +342,18 @@ public class SystemSchema extends AbstractSchema return authorizedSegments.iterator(); } - private Iterator> getAuthorizedAvailableSegments( - Iterator> availableSegmentEntries, + private Iterator> getAuthorizedAvailableSegments( + Iterator> availableSegmentEntries, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - Function, Iterable> raGenerator = segment -> Collections + Function, Iterable> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource())); - final Iterable> authorizedSegments = + final Iterable> authorizedSegments = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> availableSegmentEntries, @@ -396,6 +407,10 @@ public class SystemSchema extends AbstractSchema } } + /** + * This table contains row per server. At this time it only contains the + * data servers (i.e. historicals and peons) + */ static class ServersTable extends AbstractTable implements ScannableTable { private final TimelineServerView serverView; @@ -449,6 +464,9 @@ public class SystemSchema extends AbstractSchema } } + /** + * This table contains row per segment per server. + */ static class ServerSegmentsTable extends AbstractTable implements ScannableTable { private final TimelineServerView serverView; @@ -490,6 +508,9 @@ public class SystemSchema extends AbstractSchema } } + /** + * This table contains row per task. + */ static class TasksTable extends AbstractTable implements ScannableTable { private final DruidLeaderClient druidLeaderClient; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index ba44081fc71..d607587db4a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -247,14 +247,14 @@ public class DruidSchemaTest extends CalciteTestBase } /** - * This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case + * This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} * is called more than once for same segment */ @Test public void testSegmentMetadataHolderNumRows() { - Map segmentsMetadata = schema.getSegmentMetadata(); + Map segmentsMetadata = schema.getSegmentMetadata(); final Set segments = segmentsMetadata.keySet(); Assert.assertEquals(3, segments.size()); // find the only segment with datasource "foo2" @@ -263,10 +263,10 @@ public class DruidSchemaTest extends CalciteTestBase .findFirst() .orElse(null); Assert.assertNotNull(existingSegment); - final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); - // update SegmentMetadataHolder of existingSegment with numRows=5 - SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); - schema.setSegmentMetadataHolder(existingSegment, updatedHolder); + final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment); + // update AvailableSegmentMetadata of existingSegment with numRows=5 + AvailableSegmentMetadata updatedMetadata = AvailableSegmentMetadata.from(existingMetadata).withNumRows(5).build(); + schema.setAvailableSegmentMetadata(existingSegment, updatedMetadata); // find a druidServer holding existingSegment final Pair pair = druidServers .stream() @@ -289,19 +289,17 @@ public class DruidSchemaTest extends CalciteTestBase .filter(segment -> segment.getDataSource().equals("foo2")) .findFirst() .orElse(null); - final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment); - Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId()); - Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); + final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment); + Assert.assertEquals(updatedMetadata.getSegmentId(), currentMetadata.getSegmentId()); + Assert.assertEquals(updatedMetadata.getNumRows(), currentMetadata.getNumRows()); // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before - Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas()); - Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable()); - Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished()); + Assert.assertEquals(updatedMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); } @Test public void testNullDatasource() throws IOException { - Map segmentMetadatas = schema.getSegmentMetadata(); + Map segmentMetadatas = schema.getSegmentMetadata(); Set segments = segmentMetadatas.keySet(); Assert.assertEquals(segments.size(), 3); // segments contains two segments with datasource "foo" and one with datasource "foo2" @@ -319,9 +317,9 @@ public class DruidSchemaTest extends CalciteTestBase } @Test - public void testNullSegmentMetadataHolder() throws IOException + public void testNullAvailableSegmentMetadata() throws IOException { - Map segmentMetadatas = schema.getSegmentMetadata(); + Map segmentMetadatas = schema.getSegmentMetadata(); Set segments = segmentMetadatas.keySet(); Assert.assertEquals(segments.size(), 3); // remove one of the segments with datasource "foo" @@ -331,7 +329,7 @@ public class DruidSchemaTest extends CalciteTestBase .orElse(null); Assert.assertFalse(segmentToRemove == null); schema.removeSegment(segmentToRemove); - schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from + schema.refreshSegments(segments); // can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource segmentMetadatas = schema.getSegmentMetadata(); segments = segmentMetadatas.keySet(); Assert.assertEquals(segments.size(), 2);