SQL: Use SegmentId instead of DataSegment as set/map keys. (#7796)

Recently we've been talking about using SegmentIds as map keys rather than
DataSegments, because its sense of equality is more well-defined. This is
a refactor that does this in the druid-sql module, which mostly involves
DruidSchema and some related classes. It should have no user-visible effects.
This commit is contained in:
Gian Merlino 2019-05-30 12:58:36 -07:00 committed by GitHub
parent 8649b8ab4c
commit 58a571ccda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 105 deletions

View File

@ -253,7 +253,7 @@ public class DataSegment implements Comparable<DataSegment>
public SegmentDescriptor toDescriptor() public SegmentDescriptor toDescriptor()
{ {
return new SegmentDescriptor(getInterval(), getVersion(), shardSpec.getPartitionNum()); return id.toDescriptor();
} }
public DataSegment withLoadSpec(Map<String, Object> loadSpec) public DataSegment withLoadSpec(Map<String, Object> loadSpec)

View File

@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Chronology; import org.joda.time.Chronology;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -338,6 +339,11 @@ public final class SegmentId implements Comparable<SegmentId>
return of(dataSource, newInterval, version, partitionNum); return of(dataSource, newInterval, version, partitionNum);
} }
public SegmentDescriptor toDescriptor()
{
return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum);
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -20,7 +20,7 @@
package org.apache.druid.sql.calcite.schema; package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Set; import java.util.Set;
@ -32,20 +32,20 @@ import java.util.Set;
public class AvailableSegmentMetadata public class AvailableSegmentMetadata
{ {
public static Builder builder( public static Builder builder(
SegmentId segmentId, DataSegment segment,
long isRealtime, long isRealtime,
Set<String> segmentServers, Set<String> segmentServers,
RowSignature rowSignature, RowSignature rowSignature,
long numRows long numRows
) )
{ {
return new Builder(segmentId, isRealtime, segmentServers, rowSignature, numRows); return new Builder(segment, isRealtime, segmentServers, rowSignature, numRows);
} }
public static Builder from(AvailableSegmentMetadata h) public static Builder from(AvailableSegmentMetadata h)
{ {
return new Builder( return new Builder(
h.getSegmentId(), h.getSegment(),
h.isRealtime(), h.isRealtime(),
h.getReplicas(), h.getReplicas(),
h.getRowSignature(), h.getRowSignature(),
@ -53,7 +53,7 @@ public class AvailableSegmentMetadata
); );
} }
private final SegmentId segmentId; private final DataSegment segment;
// Booleans represented as long type, where 1 = true and 0 = false // Booleans represented as long type, where 1 = true and 0 = false
// to make it easy to count number of segments which are realtime // to make it easy to count number of segments which are realtime
private final long isRealtime; private final long isRealtime;
@ -69,7 +69,7 @@ public class AvailableSegmentMetadata
this.isRealtime = builder.isRealtime; this.isRealtime = builder.isRealtime;
this.segmentServers = builder.segmentServers; this.segmentServers = builder.segmentServers;
this.numRows = builder.numRows; this.numRows = builder.numRows;
this.segmentId = builder.segmentId; this.segment = builder.segment;
} }
public long isRealtime() public long isRealtime()
@ -77,9 +77,9 @@ public class AvailableSegmentMetadata
return isRealtime; return isRealtime;
} }
public SegmentId getSegmentId() public DataSegment getSegment()
{ {
return segmentId; return segment;
} }
public Set<String> getReplicas() public Set<String> getReplicas()
@ -105,7 +105,7 @@ public class AvailableSegmentMetadata
public static class Builder public static class Builder
{ {
private final SegmentId segmentId; private final DataSegment segment;
private final long isRealtime; private final long isRealtime;
private Set<String> segmentServers; private Set<String> segmentServers;
@ -114,14 +114,14 @@ public class AvailableSegmentMetadata
private long numRows; private long numRows;
private Builder( private Builder(
SegmentId segmentId, DataSegment segment,
long isRealtime, long isRealtime,
Set<String> servers, Set<String> servers,
RowSignature rowSignature, @Nullable RowSignature rowSignature,
long numRows long numRows
) )
{ {
this.segmentId = segmentId; this.segment = segment;
this.isRealtime = isRealtime; this.isRealtime = isRealtime;
this.segmentServers = servers; this.segmentServers = servers;
this.rowSignature = rowSignature; this.rowSignature = rowSignature;

View File

@ -38,6 +38,7 @@ import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@ -64,6 +65,7 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.view.DruidViewMacro; import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
@ -86,8 +88,8 @@ import java.util.stream.StreamSupport;
public class DruidSchema extends AbstractSchema public class DruidSchema extends AbstractSchema
{ {
// Newest segments first, so they override older ones. // Newest segments first, so they override older ones.
private static final Comparator<DataSegment> SEGMENT_ORDER = Comparator private static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
.comparing((DataSegment segment) -> segment.getInterval().getStart()) .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart())
.reversed() .reversed()
.thenComparing(Function.identity()); .thenComparing(Function.identity());
@ -112,17 +114,17 @@ public class DruidSchema extends AbstractSchema
// DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. // DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer. // Use TreeMap for segments so they are merged in deterministic order, from older to newer.
@GuardedBy("lock") @GuardedBy("lock")
private final Map<String, TreeMap<DataSegment, AvailableSegmentMetadata>> segmentMetadataInfo = new HashMap<>(); private final Map<String, TreeMap<SegmentId, AvailableSegmentMetadata>> segmentMetadataInfo = new HashMap<>();
private int totalSegments = 0; private int totalSegments = 0;
// All mutable segments. // All mutable segments.
private final Set<DataSegment> mutableSegments = new TreeSet<>(SEGMENT_ORDER); private final Set<SegmentId> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated. // All dataSources that need tables regenerated.
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>(); private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed. // All segments that need to be refreshed.
private final TreeSet<DataSegment> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
// Escalator, so we can attach an authentication result to queries we generate. // Escalator, so we can attach an authentication result to queries we generate.
private final Escalator escalator; private final Escalator escalator;
@ -202,7 +204,7 @@ public class DruidSchema extends AbstractSchema
{ {
try { try {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
final Set<DataSegment> segmentsToRefresh = new TreeSet<>(); final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
final Set<String> dataSourcesToRebuild = new TreeSet<>(); final Set<String> dataSourcesToRebuild = new TreeSet<>();
try { try {
@ -251,7 +253,7 @@ public class DruidSchema extends AbstractSchema
} }
// Refresh the segments. // Refresh the segments.
final Set<DataSegment> refreshed = refreshSegments(segmentsToRefresh); final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
synchronized (lock) { synchronized (lock) {
// Add missing segments back to the refresh list. // Add missing segments back to the refresh list.
@ -354,26 +356,26 @@ public class DruidSchema extends AbstractSchema
void addSegment(final DruidServerMetadata server, final DataSegment segment) void addSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
synchronized (lock) { synchronized (lock) {
final Map<DataSegment, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment) : null; AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) { if (segmentMetadata == null) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not // segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1; final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final Set<String> servers = ImmutableSet.of(server.getName()); final Set<String> servers = ImmutableSet.of(server.getName());
segmentMetadata = AvailableSegmentMetadata.builder( segmentMetadata = AvailableSegmentMetadata.builder(
segment.getId(), segment,
isRealtime, isRealtime,
servers, servers,
null, null,
DEFAULT_NUM_ROWS DEFAULT_NUM_ROWS
).build(); ).build();
// Unknown segment. // Unknown segment.
setAvailableSegmentMetadata(segment, segmentMetadata); setAvailableSegmentMetadata(segment.getId(), segmentMetadata);
segmentsNeedingRefresh.add(segment); segmentsNeedingRefresh.add(segment.getId());
if (!server.segmentReplicatable()) { if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getId()); log.debug("Added new mutable segment[%s].", segment.getId());
mutableSegments.add(segment); mutableSegments.add(segment.getId());
} else { } else {
log.debug("Added new immutable segment[%s].", segment.getId()); log.debug("Added new immutable segment[%s].", segment.getId());
} }
@ -387,11 +389,11 @@ public class DruidSchema extends AbstractSchema
.from(segmentMetadata) .from(segmentMetadata)
.withReplicas(servers) .withReplicas(servers)
.build(); .build();
knownSegments.put(segment, metadataWithNumReplicas); knownSegments.put(segment.getId(), metadataWithNumReplicas);
if (server.segmentReplicatable()) { if (server.segmentReplicatable()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers. // even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment); mutableSegments.remove(segment.getId());
log.debug("Segment[%s] has become immutable.", segment.getId()); log.debug("Segment[%s] has become immutable.", segment.getId());
} }
} }
@ -410,12 +412,12 @@ public class DruidSchema extends AbstractSchema
log.debug("Segment[%s] is gone.", segment.getId()); log.debug("Segment[%s] is gone.", segment.getId());
dataSourcesNeedingRebuild.add(segment.getDataSource()); dataSourcesNeedingRebuild.add(segment.getDataSource());
segmentsNeedingRefresh.remove(segment); segmentsNeedingRefresh.remove(segment.getId());
mutableSegments.remove(segment); mutableSegments.remove(segment.getId());
final Map<DataSegment, AvailableSegmentMetadata> dataSourceSegments = final Map<SegmentId, AvailableSegmentMetadata> dataSourceSegments =
segmentMetadataInfo.get(segment.getDataSource()); segmentMetadataInfo.get(segment.getDataSource());
if (dataSourceSegments.remove(segment) != null) { if (dataSourceSegments.remove(segment.getId()) != null) {
totalSegments--; totalSegments--;
} }
@ -433,8 +435,8 @@ public class DruidSchema extends AbstractSchema
{ {
synchronized (lock) { synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<DataSegment, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment); final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId());
final Set<String> segmentServers = segmentMetadata.getReplicas(); final Set<String> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServers) final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server.getName()))) .filter(Predicates.not(Predicates.equalTo(server.getName())))
@ -443,7 +445,7 @@ public class DruidSchema extends AbstractSchema
.from(segmentMetadata) .from(segmentMetadata)
.withReplicas(servers) .withReplicas(servers)
.build(); .build();
knownSegments.put(segment, metadataWithNumReplicas); knownSegments.put(segment.getId(), metadataWithNumReplicas);
lock.notifyAll(); lock.notifyAll();
} }
} }
@ -453,19 +455,19 @@ public class DruidSchema extends AbstractSchema
* which may be a subset of the asked-for set. * which may be a subset of the asked-for set.
*/ */
@VisibleForTesting @VisibleForTesting
Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{ {
final Set<DataSegment> retVal = new HashSet<>(); final Set<SegmentId> retVal = new HashSet<>();
// Organize segments by dataSource. // Organize segments by dataSource.
final Map<String, TreeSet<DataSegment>> segmentMap = new TreeMap<>(); final Map<String, TreeSet<SegmentId>> segmentMap = new TreeMap<>();
for (DataSegment segment : segments) { for (SegmentId segmentId : segments) {
segmentMap.computeIfAbsent(segment.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER))
.add(segment); .add(segmentId);
} }
for (Map.Entry<String, TreeSet<DataSegment>> entry : segmentMap.entrySet()) { for (Map.Entry<String, TreeSet<SegmentId>> entry : segmentMap.entrySet()) {
final String dataSource = entry.getKey(); final String dataSource = entry.getKey();
retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue()));
} }
@ -477,17 +479,22 @@ public class DruidSchema extends AbstractSchema
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set. * segments actually refreshed, which may be a subset of the asked-for set.
*/ */
private Set<DataSegment> refreshSegmentsForDataSource(final String dataSource, final Set<DataSegment> segments) private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, final Set<SegmentId> segments)
throws IOException throws IOException
{ {
if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) {
// Sanity check. We definitely expect this to pass.
throw new ISE("'segments' must all match 'dataSource'!");
}
log.debug("Refreshing metadata for dataSource[%s].", dataSource); log.debug("Refreshing metadata for dataSource[%s].", dataSource);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
// Segment id -> segment object. // Segment id string -> SegmentId object.
final Map<String, DataSegment> segmentMap = Maps.uniqueIndex(segments, segment -> segment.getId().toString()); final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString);
final Set<DataSegment> retVal = new HashSet<>(); final Set<SegmentId> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery( final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
queryLifecycleFactory, queryLifecycleFactory,
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY), Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY),
@ -499,24 +506,28 @@ public class DruidSchema extends AbstractSchema
try { try {
while (!yielder.isDone()) { while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get(); final SegmentAnalysis analysis = yielder.get();
final DataSegment segment = segmentMap.get(analysis.getId()); final SegmentId segmentId = segmentIdMap.get(analysis.getId());
if (segment == null) { if (segmentId == null) {
log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId()); log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId());
} else { } else {
synchronized (lock) { synchronized (lock) {
final RowSignature rowSignature = analysisToRowSignature(analysis); final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
final Map<DataSegment, AvailableSegmentMetadata> dataSourceSegments = final Map<SegmentId, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo.get(dataSource);
segmentMetadataInfo.get(segment.getDataSource());
if (dataSourceSegments == null) { if (dataSourceSegments == null) {
log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); // Datasource may have been removed or become unavailable while this refresh was ongoing.
log.warn(
"No segment map found with datasource[%s], skipping refresh of segment[%s]",
dataSource,
segmentId
);
} else { } else {
final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segment); final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segmentId);
if (segmentMetadata == null) { if (segmentMetadata == null) {
log.warn( log.warn(
"No segment[%s] found, skipping refresh", "No segment[%s] found, skipping refresh",
segment.getId() segmentId
); );
} else { } else {
final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata
@ -524,9 +535,9 @@ public class DruidSchema extends AbstractSchema
.withRowSignature(rowSignature) .withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows()) .withNumRows(analysis.getNumRows())
.build(); .build();
dataSourceSegments.put(segment, updatedSegmentMetadata); dataSourceSegments.put(segmentId, updatedSegmentMetadata);
setAvailableSegmentMetadata(segment, updatedSegmentMetadata); setAvailableSegmentMetadata(segmentId, updatedSegmentMetadata);
retVal.add(segment); retVal.add(segmentId);
} }
} }
} }
@ -551,14 +562,14 @@ public class DruidSchema extends AbstractSchema
} }
@VisibleForTesting @VisibleForTesting
void setAvailableSegmentMetadata(final DataSegment segment, final AvailableSegmentMetadata availableSegmentMetadata) void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata)
{ {
synchronized (lock) { synchronized (lock) {
TreeMap<DataSegment, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo.computeIfAbsent( TreeMap<SegmentId, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(
segment.getDataSource(), segmentId.getDataSource(),
x -> new TreeMap<>(SEGMENT_ORDER) x -> new TreeMap<>(SEGMENT_ORDER)
); );
if (dataSourceSegments.put(segment, availableSegmentMetadata) == null) { if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) {
totalSegments++; totalSegments++;
} }
} }
@ -567,7 +578,7 @@ public class DruidSchema extends AbstractSchema
private DruidTable buildDruidTable(final String dataSource) private DruidTable buildDruidTable(final String dataSource)
{ {
synchronized (lock) { synchronized (lock) {
final Map<DataSegment, AvailableSegmentMetadata> segmentMap = segmentMetadataInfo.get(dataSource); final Map<SegmentId, AvailableSegmentMetadata> segmentMap = segmentMetadataInfo.get(dataSource);
final Map<String, ValueType> columnTypes = new TreeMap<>(); final Map<String, ValueType> columnTypes = new TreeMap<>();
if (segmentMap != null) { if (segmentMap != null) {
@ -590,19 +601,19 @@ public class DruidSchema extends AbstractSchema
private static Sequence<SegmentAnalysis> runSegmentMetadataQuery( private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final QueryLifecycleFactory queryLifecycleFactory, final QueryLifecycleFactory queryLifecycleFactory,
final Iterable<DataSegment> segments, final Iterable<SegmentId> segments,
final AuthenticationResult authenticationResult final AuthenticationResult authenticationResult
) )
{ {
// Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource.
final String dataSource = Iterables.getOnlyElement( final String dataSource = Iterables.getOnlyElement(
StreamSupport.stream(segments.spliterator(), false) StreamSupport.stream(segments.spliterator(), false)
.map(DataSegment::getDataSource).collect(Collectors.toSet()) .map(SegmentId::getDataSource).collect(Collectors.toSet())
); );
final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec(
StreamSupport.stream(segments.spliterator(), false) StreamSupport.stream(segments.spliterator(), false)
.map(DataSegment::toDescriptor).collect(Collectors.toList()) .map(SegmentId::toDescriptor).collect(Collectors.toList())
); );
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
@ -643,11 +654,11 @@ public class DruidSchema extends AbstractSchema
return rowSignatureBuilder.build(); return rowSignatureBuilder.build();
} }
Map<DataSegment, AvailableSegmentMetadata> getSegmentMetadata() Map<SegmentId, AvailableSegmentMetadata> getSegmentMetadataSnapshot()
{ {
final Map<DataSegment, AvailableSegmentMetadata> segmentMetadata = new HashMap<>(); final Map<SegmentId, AvailableSegmentMetadata> segmentMetadata = new HashMap<>();
synchronized (lock) { synchronized (lock) {
for (TreeMap<DataSegment, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) { for (TreeMap<SegmentId, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val); segmentMetadata.putAll(val);
} }
} }

View File

@ -255,8 +255,9 @@ public class SystemSchema extends AbstractSchema
public Enumerable<Object[]> scan(DataContext root) public Enumerable<Object[]> scan(DataContext root)
{ {
//get available segments from druidSchema //get available segments from druidSchema
final Map<DataSegment, AvailableSegmentMetadata> availableSegmentMetadata = druidSchema.getSegmentMetadata(); final Map<SegmentId, AvailableSegmentMetadata> availableSegmentMetadata =
final Iterator<Entry<DataSegment, AvailableSegmentMetadata>> availableSegmentEntries = druidSchema.getSegmentMetadataSnapshot();
final Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries =
availableSegmentMetadata.entrySet().iterator(); availableSegmentMetadata.entrySet().iterator();
// in memory map to store segment data from available segments // in memory map to store segment data from available segments
@ -265,10 +266,11 @@ public class SystemSchema extends AbstractSchema
for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData = PartialSegmentData partialSegmentData =
new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); partialSegmentDataMap.put(h.getSegment().getId(), partialSegmentData);
} }
//get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator // get published segments from metadata segment cache (if enabled in sql planner config), else directly from
// coordinator
final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments(); final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments();
final Set<SegmentId> segmentsAlreadySeen = new HashSet<>(); final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();
@ -319,19 +321,19 @@ public class SystemSchema extends AbstractSchema
)) ))
.transform(val -> { .transform(val -> {
try { try {
if (segmentsAlreadySeen.contains(val.getKey().getId())) { if (segmentsAlreadySeen.contains(val.getKey())) {
return null; return null;
} }
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getId()); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{ return new Object[]{
val.getKey().getId(), val.getKey(),
val.getKey().getDataSource(), val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(), val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(), val.getKey().getInterval().getEnd().toString(),
val.getKey().getSize(), val.getValue().getSegment().getSize(),
val.getKey().getVersion(), val.getKey().getVersion(),
Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), (long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas, numReplicas,
val.getValue().getNumRows(), val.getValue().getNumRows(),
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
@ -342,7 +344,7 @@ public class SystemSchema extends AbstractSchema
}; };
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {
throw new RE(e, "Error getting segment payload for segment %s", val.getKey().getId()); throw new RE(e, "Error getting segment payload for segment %s", val.getKey());
} }
}); });
@ -371,18 +373,18 @@ public class SystemSchema extends AbstractSchema
return authorizedSegments.iterator(); return authorizedSegments.iterator();
} }
private Iterator<Entry<DataSegment, AvailableSegmentMetadata>> getAuthorizedAvailableSegments( private Iterator<Entry<SegmentId, AvailableSegmentMetadata>> getAuthorizedAvailableSegments(
Iterator<Entry<DataSegment, AvailableSegmentMetadata>> availableSegmentEntries, Iterator<Entry<SegmentId, AvailableSegmentMetadata>> availableSegmentEntries,
DataContext root DataContext root
) )
{ {
final AuthenticationResult authenticationResult = final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
Function<Entry<DataSegment, AvailableSegmentMetadata>, Iterable<ResourceAction>> raGenerator = segment -> Collections Function<Entry<SegmentId, AvailableSegmentMetadata>, Iterable<ResourceAction>> raGenerator = segment -> Collections
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource())); .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource()));
final Iterable<Entry<DataSegment, AvailableSegmentMetadata>> authorizedSegments = final Iterable<Entry<SegmentId, AvailableSegmentMetadata>> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources( AuthorizationUtils.filterAuthorizedResources(
authenticationResult, authenticationResult,
() -> availableSegmentEntries, () -> availableSegmentEntries,

View File

@ -52,6 +52,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -66,7 +67,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.stream.Collectors;
public class DruidSchemaTest extends CalciteTestBase public class DruidSchemaTest extends CalciteTestBase
{ {
@ -254,8 +255,11 @@ public class DruidSchemaTest extends CalciteTestBase
@Test @Test
public void testSegmentMetadataHolderNumRows() public void testSegmentMetadataHolderNumRows()
{ {
Map<DataSegment, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadata(); Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot();
final Set<DataSegment> segments = segmentsMetadata.keySet(); final List<DataSegment> segments = segmentsMetadata.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
Assert.assertEquals(3, segments.size()); Assert.assertEquals(3, segments.size());
// find the only segment with datasource "foo2" // find the only segment with datasource "foo2"
final DataSegment existingSegment = segments.stream() final DataSegment existingSegment = segments.stream()
@ -263,16 +267,16 @@ public class DruidSchemaTest extends CalciteTestBase
.findFirst() .findFirst()
.orElse(null); .orElse(null);
Assert.assertNotNull(existingSegment); Assert.assertNotNull(existingSegment);
final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment); final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment.getId());
// update AvailableSegmentMetadata of existingSegment with numRows=5 // update AvailableSegmentMetadata of existingSegment with numRows=5
AvailableSegmentMetadata updatedMetadata = AvailableSegmentMetadata.from(existingMetadata).withNumRows(5).build(); AvailableSegmentMetadata updatedMetadata = AvailableSegmentMetadata.from(existingMetadata).withNumRows(5).build();
schema.setAvailableSegmentMetadata(existingSegment, updatedMetadata); schema.setAvailableSegmentMetadata(existingSegment.getId(), updatedMetadata);
// find a druidServer holding existingSegment // find a druidServer holding existingSegment
final Pair<ImmutableDruidServer, DataSegment> pair = druidServers final Pair<ImmutableDruidServer, DataSegment> pair = druidServers
.stream() .stream()
.flatMap(druidServer -> druidServer .flatMap(druidServer -> druidServer
.getLazyAllSegments().stream() .getLazyAllSegments().stream()
.filter(segment -> segment.equals(existingSegment)) .filter(segment -> segment.getId().equals(existingSegment.getId()))
.map(segment -> Pair.of(druidServer, segment)) .map(segment -> Pair.of(druidServer, segment))
) )
.findAny() .findAny()
@ -283,14 +287,14 @@ public class DruidSchemaTest extends CalciteTestBase
final DruidServerMetadata druidServerMetadata = server.getMetadata(); final DruidServerMetadata druidServerMetadata = server.getMetadata();
// invoke DruidSchema#addSegment on existingSegment // invoke DruidSchema#addSegment on existingSegment
schema.addSegment(druidServerMetadata, existingSegment); schema.addSegment(druidServerMetadata, existingSegment);
segmentsMetadata = schema.getSegmentMetadata(); segmentsMetadata = schema.getSegmentMetadataSnapshot();
// get the only segment with datasource "foo2" // get the only segment with datasource "foo2"
final DataSegment currentSegment = segments.stream() final DataSegment currentSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2")) .filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment); final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment.getId());
Assert.assertEquals(updatedMetadata.getSegmentId(), currentMetadata.getSegmentId()); Assert.assertEquals(updatedMetadata.getSegment().getId(), currentMetadata.getSegment().getId());
Assert.assertEquals(updatedMetadata.getNumRows(), currentMetadata.getNumRows()); Assert.assertEquals(updatedMetadata.getNumRows(), currentMetadata.getNumRows());
// numreplicas do not change here since we addSegment with the same server which was serving existingSegment before // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before
Assert.assertEquals(updatedMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); Assert.assertEquals(updatedMetadata.getNumReplicas(), currentMetadata.getNumReplicas());
@ -299,8 +303,11 @@ public class DruidSchemaTest extends CalciteTestBase
@Test @Test
public void testNullDatasource() throws IOException public void testNullDatasource() throws IOException
{ {
Map<DataSegment, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadata(); final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
Set<DataSegment> segments = segmentMetadatas.keySet(); final List<DataSegment> segments = segmentMetadatas.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
Assert.assertEquals(segments.size(), 3); Assert.assertEquals(segments.size(), 3);
// segments contains two segments with datasource "foo" and one with datasource "foo2" // segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2" // let's remove the only segment with datasource "foo2"
@ -308,31 +315,34 @@ public class DruidSchemaTest extends CalciteTestBase
.filter(segment -> segment.getDataSource().equals("foo2")) .filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
Assert.assertFalse(segmentToRemove == null); Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove); schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource
segmentMetadatas = schema.getSegmentMetadata(); // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
segments = segmentMetadatas.keySet(); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(segments.size(), 2); Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
} }
@Test @Test
public void testNullAvailableSegmentMetadata() throws IOException public void testNullAvailableSegmentMetadata() throws IOException
{ {
Map<DataSegment, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadata(); final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
Set<DataSegment> segments = segmentMetadatas.keySet(); final List<DataSegment> segments = segmentMetadatas.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
Assert.assertEquals(segments.size(), 3); Assert.assertEquals(segments.size(), 3);
// remove one of the segments with datasource "foo" // remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream() final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo")) .filter(segment -> segment.getDataSource().equals("foo"))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
Assert.assertFalse(segmentToRemove == null); Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove); schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
segmentMetadatas = schema.getSegmentMetadata(); // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
segments = segmentMetadatas.keySet(); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(segments.size(), 2); Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
} }
} }