From 15d19f3059a990f022b1cb98bdd4e6a82cdf9a06 Mon Sep 17 00:00:00 2001 From: Surekha Date: Wed, 1 May 2019 09:00:57 -0700 Subject: [PATCH] Add is_overshadowed column to sys.segments table (#7425) * Add is_overshadowed column to sys.segments table * update docs * Rename class and variables * PR comments * PR comments * remove unused variables in MetadataResource * move constants together * add getFullyOvershadowedSegments method to ImmutableDruidDataSource * Fix compareTo of SegmentWithOvershadowedStatus * PR comment * PR comments * PR comments * PR comments * PR comments * fix issue with already consumed stream * minor refactoring * PR comments --- .../SegmentWithOvershadowedStatus.java | 90 +++++++++++++++++++ docs/content/querying/sql.md | 1 + .../client/ImmutableDruidDataSource.java | 40 +++++++++ .../helper/DruidCoordinatorRuleRunner.java | 30 +------ .../druid/server/http/MetadataResource.java | 64 +++++++++++-- .../calcite/schema/MetadataSegmentView.java | 69 +++++++------- .../sql/calcite/schema/SystemSchema.java | 64 +++++++------ .../sql/calcite/schema/SystemSchemaTest.java | 44 +++++---- 8 files changed, 289 insertions(+), 113 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java new file mode 100644 index 00000000000..e86daea7d86 --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * DataSegment object plus the overshadowed status for the segment. An immutable object. + * + * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} + * of the DataSegment object. + */ +public class SegmentWithOvershadowedStatus implements Comparable +{ + private final boolean overshadowed; + private final DataSegment dataSegment; + + @JsonCreator + public SegmentWithOvershadowedStatus( + @JsonProperty("dataSegment") DataSegment dataSegment, + @JsonProperty("overshadowed") boolean overshadowed + ) + { + this.dataSegment = dataSegment; + this.overshadowed = overshadowed; + } + + @JsonProperty + public boolean isOvershadowed() + { + return overshadowed; + } + + @JsonProperty + public DataSegment getDataSegment() + { + return dataSegment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof SegmentWithOvershadowedStatus)) { + return false; + } + final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o; + if (!dataSegment.equals(that.dataSegment)) { + return false; + } + if (overshadowed != (that.overshadowed)) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + int result = dataSegment.hashCode(); + result = 31 * result + Boolean.hashCode(overshadowed); + return result; + } + + @Override + public int compareTo(SegmentWithOvershadowedStatus o) + { + return dataSegment.getId().compareTo(o.dataSegment.getId()); + } +} diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 9e97138dd20..1fc1243f0a3 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -612,6 +612,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| +|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. |payload|STRING|JSON-serialized data segment payload| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 59539443b61..841b7169458 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -25,12 +25,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Ordering; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source. @@ -109,6 +114,41 @@ public class ImmutableDruidDataSource return totalSizeOfSegments; } + /** + * This method finds the overshadowed segments from the given segments + * + * @return set of overshadowed segments + */ + public static Set determineOvershadowedSegments(Iterable segments) + { + final Map> timelines = buildTimelines(segments); + + final Set overshadowedSegments = new HashSet<>(); + for (DataSegment dataSegment : segments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment); + } + } + return overshadowedSegments; + } + + /** + * Builds a timeline from given segments + * + * @return map of datasource to VersionedIntervalTimeline of segments + */ + private static Map> buildTimelines( + Iterable segments + ) + { + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + return timelines; + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 7d7830170c9..bbceaafed27 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,7 +20,7 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -32,14 +32,9 @@ import org.apache.druid.server.coordinator.ReplicationThrottler; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.DateTime; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; /** @@ -89,7 +84,8 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper // find available segments which are not overshadowed by other segments in DB // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed - Set overshadowed = determineOvershadowedSegments(params); + final Set overshadowed = ImmutableDruidDataSource + .determineOvershadowedSegments(params.getAvailableSegments()); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); @@ -138,24 +134,4 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper return params.buildFromExisting().withCoordinatorStats(stats).build(); } - - private Set determineOvershadowedSegments(DruidCoordinatorRuntimeParams params) - { - Map> timelines = new HashMap<>(); - for (DataSegment segment : params.getAvailableSegments()) { - timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } - - Set overshadowed = new HashSet<>(); - for (VersionedIntervalTimeline timeline : timelines.values()) { - for (TimelineObjectHolder holder : timeline.findOvershadowed()) { - for (DataSegment dataSegment : holder.getObject().payloads()) { - overshadowed.add(dataSegment); - } - } - } - return overshadowed; - } } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index da0abe79aea..3c7e8ac37b8 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -37,6 +37,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -51,6 +52,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -147,7 +149,8 @@ public class MetadataResource @Produces(MediaType.APPLICATION_JSON) public Response getDatabaseSegments( @Context final HttpServletRequest req, - @QueryParam("datasources") final Set datasources + @QueryParam("datasources") final Set datasources, + @QueryParam("includeOvershadowedStatus") final String includeOvershadowedStatus ) { // If we haven't polled the metadata store yet, use an empty list of datasources. @@ -159,14 +162,61 @@ public class MetadataResource } final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); - final Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + if (includeOvershadowedStatus != null) { + final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( + req, + druidDataSources, + metadataSegments + ); + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + return builder.entity(authorizedSegments).build(); + } else { - final Iterable authorizedSegments = - AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper); + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); - final Response.ResponseBuilder builder = Response.status(Response.Status.OK); - return builder.entity(authorizedSegments).build(); + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + req, + metadataSegments::iterator, + raGenerator, + authorizerMapper + ); + + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + return builder.entity(authorizedSegments).build(); + } + } + + private Iterable findAuthorizedSegmentWithOvershadowedStatus( + HttpServletRequest req, + Collection druidDataSources, + Stream metadataSegments + ) + { + // It's fine to add all overshadowed segments to a single collection because only + // a small fraction of the segments in the cluster are expected to be overshadowed, + // so building this collection shouldn't generate a lot of garbage. + final Set overshadowedSegments = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : druidDataSources) { + overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments())); + } + + final Stream segmentsWithOvershadowedStatus = metadataSegments + .map(segment -> new SegmentWithOvershadowedStatus( + segment, + overshadowedSegments.contains(segment) + )); + + final Function> raGenerator = segment -> Collections + .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); + + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + req, + segmentsWithOvershadowedStatus::iterator, + raGenerator, + authorizerMapper + ); + return authorizedSegments; } @GET diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 50fe3133cd2..18d288eb691 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DataSegmentInterner; @@ -32,7 +34,6 @@ import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -43,19 +44,17 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * This class polls the coordinator in background to keep the latest published segments. @@ -73,12 +72,19 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; - @Nullable - private final ConcurrentMap publishedSegments; + /** + * Use {@link ImmutableSortedSet} so that the order of segments is deterministic and + * sys.segments queries return the segments in sorted order based on segmentId. + * + * Volatile since this reference is reassigned in {@code poll()} and then read in {@code getPublishedSegments()} + * from other threads. + */ + @MonotonicNonNull + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); - private final AtomicBoolean cachePopulated = new AtomicBoolean(false); + private final CountDownLatch cachePopulated = new CountDownLatch(1); @Inject public MetadataSegmentView( @@ -96,7 +102,6 @@ public class MetadataSegmentView this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); - this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null; this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); } @@ -134,40 +139,32 @@ public class MetadataSegmentView private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, responseHandler, segmentWatcherConfig.getWatchedDataSources() ); - final DateTime timestamp = DateTimes.nowUtc(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { - final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next()); - // timestamp is used to filter deleted segments - publishedSegments.put(interned, timestamp); + final SegmentWithOvershadowedStatus segment = metadataSegments.next(); + final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); + final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( + interned, + segment.isOvershadowed() + ); + builder.add(segmentWithOvershadowedStatus); } - // filter the segments from cache whose timestamp is not equal to latest timestamp stored, - // since the presence of a segment with an earlier timestamp indicates that - // "that" segment is not returned by coordinator in latest poll, so it's - // likely deleted and therefore we remove it from publishedSegments - // Since segments are not atomically replaced because it can cause high - // memory footprint due to large number of published segments, so - // we are incrementally removing deleted segments from the map - // This means publishedSegments will be eventually consistent with - // the segments in coordinator - publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); - cachePopulated.set(true); + publishedSegments = builder.build(); + cachePopulated.countDown(); } - public Iterator getPublishedSegments() + public Iterator getPublishedSegments() { if (isCacheEnabled) { - Preconditions.checkState( - lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), - "hold on, still syncing published segments" - ); - return publishedSegments.keySet().iterator(); + Uninterruptibles.awaitUninterruptibly(cachePopulated); + return publishedSegments.iterator(); } else { return getMetadataSegments( coordinatorDruidLeaderClient, @@ -179,14 +176,14 @@ public class MetadataSegmentView } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources ) { - String query = "/druid/coordinator/v1/metadata/segments"; + String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { log.debug( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); @@ -195,7 +192,7 @@ public class MetadataSegmentView sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); - query = "/druid/coordinator/v1/metadata/segments?" + sb; + query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } Request request; try { @@ -213,7 +210,7 @@ public class MetadataSegmentView responseHandler ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 29f98161966..18f4c31b595 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -69,6 +69,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -92,6 +93,17 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; + /** + * Booleans constants represented as long type, + * where 1 = true and 0 = false to make it easy to count number of segments + * which are published, available etc. + */ + private static final long IS_PUBLISHED_FALSE = 0L; + private static final long IS_PUBLISHED_TRUE = 1L; + private static final long IS_AVAILABLE_TRUE = 1L; + private static final long IS_OVERSHADOWED_FALSE = 0L; + private static final long IS_OVERSHADOWED_TRUE = 1L; + static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) @@ -106,6 +118,7 @@ public class SystemSchema extends AbstractSchema .add("is_published", ValueType.LONG) .add("is_available", ValueType.LONG) .add("is_realtime", ValueType.LONG) + .add("is_overshadowed", ValueType.LONG) .add("payload", ValueType.STRING) .build(); @@ -189,14 +202,6 @@ public class SystemSchema extends AbstractSchema private final AuthorizerMapper authorizerMapper; private final MetadataSegmentView metadataView; - /** - * Booleans constants used for available segments represented as long type, - * where 1 = true and 0 = false to make it easy to count number of segments - * which are published, available - */ - private static final long DEFAULT_IS_PUBLISHED = 0; - private static final long DEFAULT_IS_AVAILABLE = 1; - public SegmentsTable( DruidSchema druidSchemna, MetadataSegmentView metadataView, @@ -235,12 +240,12 @@ public class SystemSchema extends AbstractSchema Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = - new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); + new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -251,8 +256,9 @@ public class SystemSchema extends AbstractSchema )) .transform(val -> { try { - segmentsAlreadySeen.add(val.getId()); - final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId()); + final DataSegment segment = val.getDataSegment(); + segmentsAlreadySeen.add(segment.getId()); + final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId()); long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L; if (partialSegmentData != null) { numReplicas = partialSegmentData.getNumReplicas(); @@ -261,23 +267,24 @@ public class SystemSchema extends AbstractSchema isRealtime = partialSegmentData.isRealtime(); } return new Object[]{ - val.getId(), - val.getDataSource(), - val.getInterval().getStart().toString(), - val.getInterval().getEnd().toString(), - val.getSize(), - val.getVersion(), - Long.valueOf(val.getShardSpec().getPartitionNum()), + segment.getId(), + segment.getDataSource(), + segment.getInterval().getStart().toString(), + segment.getInterval().getEnd().toString(), + segment.getSize(), + segment.getVersion(), + Long.valueOf(segment.getShardSpec().getPartitionNum()), numReplicas, numRows, - 1L, //is_published is true for published segments + IS_PUBLISHED_TRUE, //is_published is true for published segments isAvailable, isRealtime, + val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, jsonMapper.writeValueAsString(val) }; } catch (JsonProcessingException e) { - throw new RE(e, "Error getting segment payload for segment %s", val.getId()); + throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId()); } }); @@ -303,9 +310,10 @@ public class SystemSchema extends AbstractSchema Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), numReplicas, val.getValue().getNumRows(), - DEFAULT_IS_PUBLISHED, - DEFAULT_IS_AVAILABLE, + IS_PUBLISHED_FALSE, // is_published is false for unpublished segments + IS_AVAILABLE_TRUE, // is_available is assumed to be always true for segments announced by historicals or realtime tasks val.getValue().isRealtime(), + IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed jsonMapper.writeValueAsString(val.getKey()) }; } @@ -322,18 +330,18 @@ public class SystemSchema extends AbstractSchema } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> it, raGenerator, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index d354f141d9a..a942db4b556 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -76,6 +76,7 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -381,7 +382,7 @@ public class SystemSchemaTest extends CalciteTestBase final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); - Assert.assertEquals(13, fields.size()); + Assert.assertEquals(14, fields.size()); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); @@ -408,11 +409,14 @@ public class SystemSchemaTest extends CalciteTestBase .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - final Set publishedSegments = Stream.of(publishedSegment1, - publishedSegment2, - publishedSegment3, - segment1, - segment2).collect(Collectors.toSet()); + final Set publishedSegments = Stream.of( + new SegmentWithOvershadowedStatus(publishedSegment1, true), + new SegmentWithOvershadowedStatus(publishedSegment2, false), + new SegmentWithOvershadowedStatus(publishedSegment3, false), + new SegmentWithOvershadowedStatus(segment1, true), + new SegmentWithOvershadowedStatus(segment2, false) + ).collect(Collectors.toSet()); + EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); @@ -463,7 +467,8 @@ public class SystemSchemaTest extends CalciteTestBase 3L, //numRows 1L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 1L //is_overshadowed ); verifyRow( @@ -475,7 +480,8 @@ public class SystemSchemaTest extends CalciteTestBase 3L, //numRows 1L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 @@ -488,7 +494,8 @@ public class SystemSchemaTest extends CalciteTestBase 2L, //numRows 0L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -500,7 +507,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //numRows 0L, //is_published 1L, //is_available - 1L //is_realtime + 1L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -512,7 +520,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //numRows 0L, //is_published 1L, //is_available - 1L //is_realtime + 1L, //is_realtime + 0L //is_overshadowed ); // wikipedia segments are published and unavailable, num_replicas is 0 @@ -525,7 +534,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 1L //is_overshadowed ); verifyRow( @@ -537,7 +547,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -549,7 +560,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); // Verify value types. @@ -565,7 +577,8 @@ public class SystemSchemaTest extends CalciteTestBase long numRows, long isPublished, long isAvailable, - long isRealtime) + long isRealtime, + long isOvershadowed) { Assert.assertEquals(segmentId, row[0].toString()); SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0); @@ -580,6 +593,7 @@ public class SystemSchemaTest extends CalciteTestBase Assert.assertEquals(isPublished, row[9]); Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isRealtime, row[11]); + Assert.assertEquals(isOvershadowed, row[12]); } @Test