Add is_overshadowed column to sys.segments table (#7425)

* Add is_overshadowed column to sys.segments table

* update docs

* Rename class and variables

* PR comments

* PR comments

* remove unused variables in MetadataResource

* move constants together

* add getFullyOvershadowedSegments method to ImmutableDruidDataSource

* Fix compareTo of SegmentWithOvershadowedStatus

* PR comment

* PR comments

* PR comments

* PR comments

* PR comments

* fix issue with already consumed stream

* minor refactoring

* PR comments
This commit is contained in:
Surekha 2019-05-01 09:00:57 -07:00 committed by Roman Leventov
parent d97c0d19a0
commit 15d19f3059
8 changed files with 289 additions and 113 deletions

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
*
* SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
* of the DataSegment object.
*/
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
{
private final boolean overshadowed;
private final DataSegment dataSegment;
@JsonCreator
public SegmentWithOvershadowedStatus(
@JsonProperty("dataSegment") DataSegment dataSegment,
@JsonProperty("overshadowed") boolean overshadowed
)
{
this.dataSegment = dataSegment;
this.overshadowed = overshadowed;
}
@JsonProperty
public boolean isOvershadowed()
{
return overshadowed;
}
@JsonProperty
public DataSegment getDataSegment()
{
return dataSegment;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof SegmentWithOvershadowedStatus)) {
return false;
}
final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
if (!dataSegment.equals(that.dataSegment)) {
return false;
}
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = dataSegment.hashCode();
result = 31 * result + Boolean.hashCode(overshadowed);
return result;
}
@Override
public int compareTo(SegmentWithOvershadowedStatus o)
{
return dataSegment.getId().compareTo(o.dataSegment.getId());
}
}

View File

@ -612,6 +612,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His
|is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`|
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)|
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet.
|payload|STRING|JSON-serialized data segment payload| |payload|STRING|JSON-serialized data segment payload|
For example to retrieve all segments for datasource "wikipedia", use the query: For example to retrieve all segments for datasource "wikipedia", use the query:

View File

@ -25,12 +25,17 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
/** /**
* An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source. * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
@ -109,6 +114,41 @@ public class ImmutableDruidDataSource
return totalSizeOfSegments; return totalSizeOfSegments;
} }
/**
* This method finds the overshadowed segments from the given segments
*
* @return set of overshadowed segments
*/
public static Set<DataSegment> determineOvershadowedSegments(Iterable<DataSegment> segments)
{
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = buildTimelines(segments);
final Set<DataSegment> overshadowedSegments = new HashSet<>();
for (DataSegment dataSegment : segments) {
final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
overshadowedSegments.add(dataSegment);
}
}
return overshadowedSegments;
}
/**
* Builds a timeline from given segments
*
* @return map of datasource to VersionedIntervalTimeline of segments
*/
private static Map<String, VersionedIntervalTimeline<String, DataSegment>> buildTimelines(
Iterable<DataSegment> segments
)
{
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
segments.forEach(segment -> timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
return timelines;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -20,7 +20,7 @@
package org.apache.druid.server.coordinator.helper; package org.apache.druid.server.coordinator.helper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
@ -32,14 +32,9 @@ import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
@ -89,7 +84,8 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
// find available segments which are not overshadowed by other segments in DB // find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped // only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
Set<DataSegment> overshadowed = determineOvershadowedSegments(params); final Set<DataSegment> overshadowed = ImmutableDruidDataSource
.determineOvershadowedSegments(params.getAvailableSegments());
for (String tier : cluster.getTierNames()) { for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier); replicatorThrottler.updateReplicationState(tier);
@ -138,24 +134,4 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
return params.buildFromExisting().withCoordinatorStats(stats).build(); return params.buildFromExisting().withCoordinatorStats(stats).build();
} }
private Set<DataSegment> determineOvershadowedSegments(DruidCoordinatorRuntimeParams params)
{
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
for (DataSegment segment : params.getAvailableSegments()) {
timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
Set<DataSegment> overshadowed = new HashSet<>();
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment dataSegment : holder.getObject().payloads()) {
overshadowed.add(dataSegment);
}
}
}
return overshadowed;
}
} }

View File

@ -37,6 +37,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -51,6 +52,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -147,7 +149,8 @@ public class MetadataResource
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Response getDatabaseSegments( public Response getDatabaseSegments(
@Context final HttpServletRequest req, @Context final HttpServletRequest req,
@QueryParam("datasources") final Set<String> datasources @QueryParam("datasources") final Set<String> datasources,
@QueryParam("includeOvershadowedStatus") final String includeOvershadowedStatus
) )
{ {
// If we haven't polled the metadata store yet, use an empty list of datasources. // If we haven't polled the metadata store yet, use an empty list of datasources.
@ -159,14 +162,61 @@ public class MetadataResource
} }
final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream());
final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList( if (includeOvershadowedStatus != null) {
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus(
req,
druidDataSources,
metadataSegments
);
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
return builder.entity(authorizedSegments).build();
} else {
final Iterable<DataSegment> authorizedSegments = final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper); AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
final Response.ResponseBuilder builder = Response.status(Response.Status.OK); final Iterable<DataSegment> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
return builder.entity(authorizedSegments).build(); req,
metadataSegments::iterator,
raGenerator,
authorizerMapper
);
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
return builder.entity(authorizedSegments).build();
}
}
private Iterable<SegmentWithOvershadowedStatus> findAuthorizedSegmentWithOvershadowedStatus(
HttpServletRequest req,
Collection<ImmutableDruidDataSource> druidDataSources,
Stream<DataSegment> metadataSegments
)
{
// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final Set<DataSegment> overshadowedSegments = new HashSet<>();
for (ImmutableDruidDataSource dataSource : druidDataSources) {
overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments()));
}
final Stream<SegmentWithOvershadowedStatus> segmentsWithOvershadowedStatus = metadataSegments
.map(segment -> new SegmentWithOvershadowedStatus(
segment,
overshadowedSegments.contains(segment)
));
final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
req,
segmentsWithOvershadowedStatus::iterator,
raGenerator,
authorizerMapper
);
return authorizedSegments;
} }
@GET @GET

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.DataSegmentInterner;
@ -32,7 +34,6 @@ import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
@ -43,19 +44,17 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* This class polls the coordinator in background to keep the latest published segments. * This class polls the coordinator in background to keep the latest published segments.
@ -73,12 +72,19 @@ public class MetadataSegmentView
private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final BrokerSegmentWatcherConfig segmentWatcherConfig;
private final boolean isCacheEnabled; private final boolean isCacheEnabled;
@Nullable /**
private final ConcurrentMap<DataSegment, DateTime> publishedSegments; * Use {@link ImmutableSortedSet} so that the order of segments is deterministic and
* sys.segments queries return the segments in sorted order based on segmentId.
*
* Volatile since this reference is reassigned in {@code poll()} and then read in {@code getPublishedSegments()}
* from other threads.
*/
@MonotonicNonNull
private volatile ImmutableSortedSet<SegmentWithOvershadowedStatus> publishedSegments = null;
private final ScheduledExecutorService scheduledExec; private final ScheduledExecutorService scheduledExec;
private final long pollPeriodInMS; private final long pollPeriodInMS;
private final LifecycleLock lifecycleLock = new LifecycleLock(); private final LifecycleLock lifecycleLock = new LifecycleLock();
private final AtomicBoolean cachePopulated = new AtomicBoolean(false); private final CountDownLatch cachePopulated = new CountDownLatch(1);
@Inject @Inject
public MetadataSegmentView( public MetadataSegmentView(
@ -96,7 +102,6 @@ public class MetadataSegmentView
this.segmentWatcherConfig = segmentWatcherConfig; this.segmentWatcherConfig = segmentWatcherConfig;
this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod();
this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null;
this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
} }
@ -134,40 +139,32 @@ public class MetadataSegmentView
private void poll() private void poll()
{ {
log.info("polling published segments from coordinator"); log.info("polling published segments from coordinator");
final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments( final JsonParserIterator<SegmentWithOvershadowedStatus> metadataSegments = getMetadataSegments(
coordinatorDruidLeaderClient, coordinatorDruidLeaderClient,
jsonMapper, jsonMapper,
responseHandler, responseHandler,
segmentWatcherConfig.getWatchedDataSources() segmentWatcherConfig.getWatchedDataSources()
); );
final DateTime timestamp = DateTimes.nowUtc(); final ImmutableSortedSet.Builder<SegmentWithOvershadowedStatus> builder = ImmutableSortedSet.naturalOrder();
while (metadataSegments.hasNext()) { while (metadataSegments.hasNext()) {
final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next()); final SegmentWithOvershadowedStatus segment = metadataSegments.next();
// timestamp is used to filter deleted segments final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
publishedSegments.put(interned, timestamp); final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus(
interned,
segment.isOvershadowed()
);
builder.add(segmentWithOvershadowedStatus);
} }
// filter the segments from cache whose timestamp is not equal to latest timestamp stored, publishedSegments = builder.build();
// since the presence of a segment with an earlier timestamp indicates that cachePopulated.countDown();
// "that" segment is not returned by coordinator in latest poll, so it's
// likely deleted and therefore we remove it from publishedSegments
// Since segments are not atomically replaced because it can cause high
// memory footprint due to large number of published segments, so
// we are incrementally removing deleted segments from the map
// This means publishedSegments will be eventually consistent with
// the segments in coordinator
publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp);
cachePopulated.set(true);
} }
public Iterator<DataSegment> getPublishedSegments() public Iterator<SegmentWithOvershadowedStatus> getPublishedSegments()
{ {
if (isCacheEnabled) { if (isCacheEnabled) {
Preconditions.checkState( Uninterruptibles.awaitUninterruptibly(cachePopulated);
lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), return publishedSegments.iterator();
"hold on, still syncing published segments"
);
return publishedSegments.keySet().iterator();
} else { } else {
return getMetadataSegments( return getMetadataSegments(
coordinatorDruidLeaderClient, coordinatorDruidLeaderClient,
@ -179,14 +176,14 @@ public class MetadataSegmentView
} }
// Note that coordinator must be up to get segments // Note that coordinator must be up to get segments
private JsonParserIterator<DataSegment> getMetadataSegments( private JsonParserIterator<SegmentWithOvershadowedStatus> getMetadataSegments(
DruidLeaderClient coordinatorClient, DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler, BytesAccumulatingResponseHandler responseHandler,
Set<String> watchedDataSources Set<String> watchedDataSources
) )
{ {
String query = "/druid/coordinator/v1/metadata/segments"; String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus";
if (watchedDataSources != null && !watchedDataSources.isEmpty()) { if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
log.debug( log.debug(
"filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources);
@ -195,7 +192,7 @@ public class MetadataSegmentView
sb.append("datasources=").append(ds).append("&"); sb.append("datasources=").append(ds).append("&");
} }
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
query = "/druid/coordinator/v1/metadata/segments?" + sb; query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb;
} }
Request request; Request request;
try { try {
@ -213,7 +210,7 @@ public class MetadataSegmentView
responseHandler responseHandler
); );
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>() final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<SegmentWithOvershadowedStatus>()
{ {
}); });
return new JsonParserIterator<>( return new JsonParserIterator<>(

View File

@ -69,6 +69,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -92,6 +93,17 @@ public class SystemSchema extends AbstractSchema
private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks"; private static final String TASKS_TABLE = "tasks";
/**
* Booleans constants represented as long type,
* where 1 = true and 0 = false to make it easy to count number of segments
* which are published, available etc.
*/
private static final long IS_PUBLISHED_FALSE = 0L;
private static final long IS_PUBLISHED_TRUE = 1L;
private static final long IS_AVAILABLE_TRUE = 1L;
private static final long IS_OVERSHADOWED_FALSE = 0L;
private static final long IS_OVERSHADOWED_TRUE = 1L;
static final RowSignature SEGMENTS_SIGNATURE = RowSignature static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder() .builder()
.add("segment_id", ValueType.STRING) .add("segment_id", ValueType.STRING)
@ -106,6 +118,7 @@ public class SystemSchema extends AbstractSchema
.add("is_published", ValueType.LONG) .add("is_published", ValueType.LONG)
.add("is_available", ValueType.LONG) .add("is_available", ValueType.LONG)
.add("is_realtime", ValueType.LONG) .add("is_realtime", ValueType.LONG)
.add("is_overshadowed", ValueType.LONG)
.add("payload", ValueType.STRING) .add("payload", ValueType.STRING)
.build(); .build();
@ -189,14 +202,6 @@ public class SystemSchema extends AbstractSchema
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView; private final MetadataSegmentView metadataView;
/**
* Booleans constants used for available segments represented as long type,
* where 1 = true and 0 = false to make it easy to count number of segments
* which are published, available
*/
private static final long DEFAULT_IS_PUBLISHED = 0;
private static final long DEFAULT_IS_AVAILABLE = 1;
public SegmentsTable( public SegmentsTable(
DruidSchema druidSchemna, DruidSchema druidSchemna,
MetadataSegmentView metadataView, MetadataSegmentView metadataView,
@ -235,12 +240,12 @@ public class SystemSchema extends AbstractSchema
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData = PartialSegmentData partialSegmentData =
new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
} }
//get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator
final Iterator<DataSegment> metadataStoreSegments = metadataView.getPublishedSegments(); final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments();
final Set<SegmentId> segmentsAlreadySeen = new HashSet<>(); final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();
@ -251,8 +256,9 @@ public class SystemSchema extends AbstractSchema
)) ))
.transform(val -> { .transform(val -> {
try { try {
segmentsAlreadySeen.add(val.getId()); final DataSegment segment = val.getDataSegment();
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId()); segmentsAlreadySeen.add(segment.getId());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L; long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) { if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas(); numReplicas = partialSegmentData.getNumReplicas();
@ -261,23 +267,24 @@ public class SystemSchema extends AbstractSchema
isRealtime = partialSegmentData.isRealtime(); isRealtime = partialSegmentData.isRealtime();
} }
return new Object[]{ return new Object[]{
val.getId(), segment.getId(),
val.getDataSource(), segment.getDataSource(),
val.getInterval().getStart().toString(), segment.getInterval().getStart().toString(),
val.getInterval().getEnd().toString(), segment.getInterval().getEnd().toString(),
val.getSize(), segment.getSize(),
val.getVersion(), segment.getVersion(),
Long.valueOf(val.getShardSpec().getPartitionNum()), Long.valueOf(segment.getShardSpec().getPartitionNum()),
numReplicas, numReplicas,
numRows, numRows,
1L, //is_published is true for published segments IS_PUBLISHED_TRUE, //is_published is true for published segments
isAvailable, isAvailable,
isRealtime, isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
jsonMapper.writeValueAsString(val) jsonMapper.writeValueAsString(val)
}; };
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {
throw new RE(e, "Error getting segment payload for segment %s", val.getId()); throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId());
} }
}); });
@ -303,9 +310,10 @@ public class SystemSchema extends AbstractSchema
Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), Long.valueOf(val.getKey().getShardSpec().getPartitionNum()),
numReplicas, numReplicas,
val.getValue().getNumRows(), val.getValue().getNumRows(),
DEFAULT_IS_PUBLISHED, IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
DEFAULT_IS_AVAILABLE, IS_AVAILABLE_TRUE, // is_available is assumed to be always true for segments announced by historicals or realtime tasks
val.getValue().isRealtime(), val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
jsonMapper.writeValueAsString(val.getKey()) jsonMapper.writeValueAsString(val.getKey())
}; };
} }
@ -322,18 +330,18 @@ public class SystemSchema extends AbstractSchema
} }
private Iterator<DataSegment> getAuthorizedPublishedSegments( private Iterator<SegmentWithOvershadowedStatus> getAuthorizedPublishedSegments(
Iterator<DataSegment> it, Iterator<SegmentWithOvershadowedStatus> it,
DataContext root DataContext root
) )
{ {
final AuthenticationResult authenticationResult = final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList( Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
final Iterable<DataSegment> authorizedSegments = AuthorizationUtils.filterAuthorizedResources( final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
authenticationResult, authenticationResult,
() -> it, () -> it,
raGenerator, raGenerator,

View File

@ -76,6 +76,7 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
@ -381,7 +382,7 @@ public class SystemSchemaTest extends CalciteTestBase
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList(); final List<RelDataTypeField> fields = rowType.getFieldList();
Assert.assertEquals(13, fields.size()); Assert.assertEquals(14, fields.size());
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
@ -408,11 +409,14 @@ public class SystemSchemaTest extends CalciteTestBase
.withConstructor(druidSchema, metadataView, mapper, authMapper) .withConstructor(druidSchema, metadataView, mapper, authMapper)
.createMock(); .createMock();
EasyMock.replay(segmentsTable); EasyMock.replay(segmentsTable);
final Set<DataSegment> publishedSegments = Stream.of(publishedSegment1, final Set<SegmentWithOvershadowedStatus> publishedSegments = Stream.of(
publishedSegment2, new SegmentWithOvershadowedStatus(publishedSegment1, true),
publishedSegment3, new SegmentWithOvershadowedStatus(publishedSegment2, false),
segment1, new SegmentWithOvershadowedStatus(publishedSegment3, false),
segment2).collect(Collectors.toSet()); new SegmentWithOvershadowedStatus(segment1, true),
new SegmentWithOvershadowedStatus(segment2, false)
).collect(Collectors.toSet());
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); EasyMock.replay(client, request, responseHolder, responseHandler, metadataView);
@ -463,7 +467,8 @@ public class SystemSchemaTest extends CalciteTestBase
3L, //numRows 3L, //numRows
1L, //is_published 1L, //is_published
1L, //is_available 1L, //is_available
0L //is_realtime 0L, //is_realtime
1L //is_overshadowed
); );
verifyRow( verifyRow(
@ -475,7 +480,8 @@ public class SystemSchemaTest extends CalciteTestBase
3L, //numRows 3L, //numRows
1L, //is_published 1L, //is_published
1L, //is_available 1L, //is_available
0L //is_realtime 0L, //is_realtime
0L //is_overshadowed
); );
//segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
@ -488,7 +494,8 @@ public class SystemSchemaTest extends CalciteTestBase
2L, //numRows 2L, //numRows
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
0L //is_realtime 0L, //is_realtime
0L //is_overshadowed
); );
verifyRow( verifyRow(
@ -500,7 +507,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //numRows 0L, //numRows
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
1L //is_realtime 1L, //is_realtime
0L //is_overshadowed
); );
verifyRow( verifyRow(
@ -512,7 +520,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //numRows 0L, //numRows
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
1L //is_realtime 1L, //is_realtime
0L //is_overshadowed
); );
// wikipedia segments are published and unavailable, num_replicas is 0 // wikipedia segments are published and unavailable, num_replicas is 0
@ -525,7 +534,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //numRows 0L, //numRows
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L //is_realtime 0L, //is_realtime
1L //is_overshadowed
); );
verifyRow( verifyRow(
@ -537,7 +547,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //numRows 0L, //numRows
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L //is_realtime 0L, //is_realtime
0L //is_overshadowed
); );
verifyRow( verifyRow(
@ -549,7 +560,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //numRows 0L, //numRows
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L //is_realtime 0L, //is_realtime
0L //is_overshadowed
); );
// Verify value types. // Verify value types.
@ -565,7 +577,8 @@ public class SystemSchemaTest extends CalciteTestBase
long numRows, long numRows,
long isPublished, long isPublished,
long isAvailable, long isAvailable,
long isRealtime) long isRealtime,
long isOvershadowed)
{ {
Assert.assertEquals(segmentId, row[0].toString()); Assert.assertEquals(segmentId, row[0].toString());
SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0); SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
@ -580,6 +593,7 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals(isPublished, row[9]); Assert.assertEquals(isPublished, row[9]);
Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isAvailable, row[10]);
Assert.assertEquals(isRealtime, row[11]); Assert.assertEquals(isRealtime, row[11]);
Assert.assertEquals(isOvershadowed, row[12]);
} }
@Test @Test