diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index 97fd14a5831..fb6a287fc3a 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -253,7 +253,7 @@ public class DataSegment implements Comparable public SegmentDescriptor toDescriptor() { - return new SegmentDescriptor(getInterval(), getVersion(), shardSpec.getPartitionNum()); + return id.toDescriptor(); } public DataSegment withLoadSpec(Map loadSpec) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentId.java b/core/src/main/java/org/apache/druid/timeline/SegmentId.java index 2c21e21bc6e..8ac01e4e31a 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentId.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentId.java @@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Chronology; import org.joda.time.DateTime; @@ -338,6 +339,11 @@ public final class SegmentId implements Comparable return of(dataSource, newInterval, version, partitionNum); } + public SegmentDescriptor toDescriptor() + { + return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum); + } + @Override public boolean equals(Object o) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java index 72b69facdc7..1dc8e1ed779 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java @@ -20,7 +20,7 @@ package org.apache.druid.sql.calcite.schema; import org.apache.druid.sql.calcite.table.RowSignature; -import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.util.Set; @@ -32,20 +32,20 @@ import java.util.Set; public class AvailableSegmentMetadata { public static Builder builder( - SegmentId segmentId, + DataSegment segment, long isRealtime, Set segmentServers, RowSignature rowSignature, long numRows ) { - return new Builder(segmentId, isRealtime, segmentServers, rowSignature, numRows); + return new Builder(segment, isRealtime, segmentServers, rowSignature, numRows); } public static Builder from(AvailableSegmentMetadata h) { return new Builder( - h.getSegmentId(), + h.getSegment(), h.isRealtime(), h.getReplicas(), h.getRowSignature(), @@ -53,7 +53,7 @@ public class AvailableSegmentMetadata ); } - private final SegmentId segmentId; + private final DataSegment segment; // Booleans represented as long type, where 1 = true and 0 = false // to make it easy to count number of segments which are realtime private final long isRealtime; @@ -69,7 +69,7 @@ public class AvailableSegmentMetadata this.isRealtime = builder.isRealtime; this.segmentServers = builder.segmentServers; this.numRows = builder.numRows; - this.segmentId = builder.segmentId; + this.segment = builder.segment; } public long isRealtime() @@ -77,9 +77,9 @@ public class AvailableSegmentMetadata return isRealtime; } - public SegmentId getSegmentId() + public DataSegment getSegment() { - return segmentId; + return segment; } public Set getReplicas() @@ -105,7 +105,7 @@ public class AvailableSegmentMetadata public static class Builder { - private final SegmentId segmentId; + private final DataSegment segment; private final long isRealtime; private Set segmentServers; @@ -114,14 +114,14 @@ public class AvailableSegmentMetadata private long numRows; private Builder( - SegmentId segmentId, + DataSegment segment, long isRealtime, Set servers, - RowSignature rowSignature, + @Nullable RowSignature rowSignature, long numRows ) { - this.segmentId = segmentId; + this.segment = segment; this.isRealtime = isRealtime; this.segmentServers = servers; this.rowSignature = rowSignature; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 758a5f61f98..2116fbb38d6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -38,6 +38,7 @@ import org.apache.druid.client.ServerView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; @@ -64,6 +65,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.view.DruidViewMacro; import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import java.io.IOException; import java.util.Comparator; @@ -86,8 +88,8 @@ import java.util.stream.StreamSupport; public class DruidSchema extends AbstractSchema { // Newest segments first, so they override older ones. - private static final Comparator SEGMENT_ORDER = Comparator - .comparing((DataSegment segment) -> segment.getInterval().getStart()) + private static final Comparator SEGMENT_ORDER = Comparator + .comparing((SegmentId segmentId) -> segmentId.getInterval().getStart()) .reversed() .thenComparing(Function.identity()); @@ -112,17 +114,17 @@ public class DruidSchema extends AbstractSchema // DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. @GuardedBy("lock") - private final Map> segmentMetadataInfo = new HashMap<>(); + private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; // All mutable segments. - private final Set mutableSegments = new TreeSet<>(SEGMENT_ORDER); + private final Set mutableSegments = new TreeSet<>(SEGMENT_ORDER); // All dataSources that need tables regenerated. private final Set dataSourcesNeedingRebuild = new HashSet<>(); // All segments that need to be refreshed. - private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); + private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); // Escalator, so we can attach an authentication result to queries we generate. private final Escalator escalator; @@ -202,7 +204,7 @@ public class DruidSchema extends AbstractSchema { try { while (!Thread.currentThread().isInterrupted()) { - final Set segmentsToRefresh = new TreeSet<>(); + final Set segmentsToRefresh = new TreeSet<>(); final Set dataSourcesToRebuild = new TreeSet<>(); try { @@ -251,7 +253,7 @@ public class DruidSchema extends AbstractSchema } // Refresh the segments. - final Set refreshed = refreshSegments(segmentsToRefresh); + final Set refreshed = refreshSegments(segmentsToRefresh); synchronized (lock) { // Add missing segments back to the refresh list. @@ -354,26 +356,26 @@ public class DruidSchema extends AbstractSchema void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment) : null; + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; if (segmentMetadata == null) { // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; final Set servers = ImmutableSet.of(server.getName()); segmentMetadata = AvailableSegmentMetadata.builder( - segment.getId(), + segment, isRealtime, servers, null, DEFAULT_NUM_ROWS ).build(); // Unknown segment. - setAvailableSegmentMetadata(segment, segmentMetadata); - segmentsNeedingRefresh.add(segment); + setAvailableSegmentMetadata(segment.getId(), segmentMetadata); + segmentsNeedingRefresh.add(segment.getId()); if (!server.segmentReplicatable()) { log.debug("Added new mutable segment[%s].", segment.getId()); - mutableSegments.add(segment); + mutableSegments.add(segment.getId()); } else { log.debug("Added new immutable segment[%s].", segment.getId()); } @@ -387,11 +389,11 @@ public class DruidSchema extends AbstractSchema .from(segmentMetadata) .withReplicas(servers) .build(); - knownSegments.put(segment, metadataWithNumReplicas); + knownSegments.put(segment.getId(), metadataWithNumReplicas); if (server.segmentReplicatable()) { // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, // even if it's also available on non-replicatable (realtime) servers. - mutableSegments.remove(segment); + mutableSegments.remove(segment.getId()); log.debug("Segment[%s] has become immutable.", segment.getId()); } } @@ -410,12 +412,12 @@ public class DruidSchema extends AbstractSchema log.debug("Segment[%s] is gone.", segment.getId()); dataSourcesNeedingRebuild.add(segment.getDataSource()); - segmentsNeedingRefresh.remove(segment); - mutableSegments.remove(segment); + segmentsNeedingRefresh.remove(segment.getId()); + mutableSegments.remove(segment.getId()); - final Map dataSourceSegments = + final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); - if (dataSourceSegments.remove(segment) != null) { + if (dataSourceSegments.remove(segment.getId()) != null) { totalSegments--; } @@ -433,8 +435,8 @@ public class DruidSchema extends AbstractSchema { synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment); + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); final Set segmentServers = segmentMetadata.getReplicas(); final ImmutableSet servers = FluentIterable.from(segmentServers) .filter(Predicates.not(Predicates.equalTo(server.getName()))) @@ -443,7 +445,7 @@ public class DruidSchema extends AbstractSchema .from(segmentMetadata) .withReplicas(servers) .build(); - knownSegments.put(segment, metadataWithNumReplicas); + knownSegments.put(segment.getId(), metadataWithNumReplicas); lock.notifyAll(); } } @@ -453,19 +455,19 @@ public class DruidSchema extends AbstractSchema * which may be a subset of the asked-for set. */ @VisibleForTesting - Set refreshSegments(final Set segments) throws IOException + Set refreshSegments(final Set segments) throws IOException { - final Set retVal = new HashSet<>(); + final Set retVal = new HashSet<>(); // Organize segments by dataSource. - final Map> segmentMap = new TreeMap<>(); + final Map> segmentMap = new TreeMap<>(); - for (DataSegment segment : segments) { - segmentMap.computeIfAbsent(segment.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) - .add(segment); + for (SegmentId segmentId : segments) { + segmentMap.computeIfAbsent(segmentId.getDataSource(), x -> new TreeSet<>(SEGMENT_ORDER)) + .add(segmentId); } - for (Map.Entry> entry : segmentMap.entrySet()) { + for (Map.Entry> entry : segmentMap.entrySet()) { final String dataSource = entry.getKey(); retVal.addAll(refreshSegmentsForDataSource(dataSource, entry.getValue())); } @@ -477,17 +479,22 @@ public class DruidSchema extends AbstractSchema * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * segments actually refreshed, which may be a subset of the asked-for set. */ - private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) + private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) throws IOException { + if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { + // Sanity check. We definitely expect this to pass. + throw new ISE("'segments' must all match 'dataSource'!"); + } + log.debug("Refreshing metadata for dataSource[%s].", dataSource); final long startTime = System.currentTimeMillis(); - // Segment id -> segment object. - final Map segmentMap = Maps.uniqueIndex(segments, segment -> segment.getId().toString()); + // Segment id string -> SegmentId object. + final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); - final Set retVal = new HashSet<>(); + final Set retVal = new HashSet<>(); final Sequence sequence = runSegmentMetadataQuery( queryLifecycleFactory, Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY), @@ -499,24 +506,28 @@ public class DruidSchema extends AbstractSchema try { while (!yielder.isDone()) { final SegmentAnalysis analysis = yielder.get(); - final DataSegment segment = segmentMap.get(analysis.getId()); + final SegmentId segmentId = segmentIdMap.get(analysis.getId()); - if (segment == null) { + if (segmentId == null) { log.warn("Got analysis for segment[%s] we didn't ask for, ignoring.", analysis.getId()); } else { synchronized (lock) { final RowSignature rowSignature = analysisToRowSignature(analysis); - log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); - final Map dataSourceSegments = - segmentMetadataInfo.get(segment.getDataSource()); + log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature); + final Map dataSourceSegments = segmentMetadataInfo.get(dataSource); if (dataSourceSegments == null) { - log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); + // Datasource may have been removed or become unavailable while this refresh was ongoing. + log.warn( + "No segment map found with datasource[%s], skipping refresh of segment[%s]", + dataSource, + segmentId + ); } else { - final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segment); + final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segmentId); if (segmentMetadata == null) { log.warn( "No segment[%s] found, skipping refresh", - segment.getId() + segmentId ); } else { final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata @@ -524,9 +535,9 @@ public class DruidSchema extends AbstractSchema .withRowSignature(rowSignature) .withNumRows(analysis.getNumRows()) .build(); - dataSourceSegments.put(segment, updatedSegmentMetadata); - setAvailableSegmentMetadata(segment, updatedSegmentMetadata); - retVal.add(segment); + dataSourceSegments.put(segmentId, updatedSegmentMetadata); + setAvailableSegmentMetadata(segmentId, updatedSegmentMetadata); + retVal.add(segmentId); } } } @@ -551,14 +562,14 @@ public class DruidSchema extends AbstractSchema } @VisibleForTesting - void setAvailableSegmentMetadata(final DataSegment segment, final AvailableSegmentMetadata availableSegmentMetadata) + void setAvailableSegmentMetadata(final SegmentId segmentId, final AvailableSegmentMetadata availableSegmentMetadata) { synchronized (lock) { - TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( - segment.getDataSource(), + TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( + segmentId.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER) ); - if (dataSourceSegments.put(segment, availableSegmentMetadata) == null) { + if (dataSourceSegments.put(segmentId, availableSegmentMetadata) == null) { totalSegments++; } } @@ -567,7 +578,7 @@ public class DruidSchema extends AbstractSchema private DruidTable buildDruidTable(final String dataSource) { synchronized (lock) { - final Map segmentMap = segmentMetadataInfo.get(dataSource); + final Map segmentMap = segmentMetadataInfo.get(dataSource); final Map columnTypes = new TreeMap<>(); if (segmentMap != null) { @@ -590,19 +601,19 @@ public class DruidSchema extends AbstractSchema private static Sequence runSegmentMetadataQuery( final QueryLifecycleFactory queryLifecycleFactory, - final Iterable segments, + final Iterable segments, final AuthenticationResult authenticationResult ) { // Sanity check: getOnlyElement of a set, to ensure all segments have the same dataSource. final String dataSource = Iterables.getOnlyElement( StreamSupport.stream(segments.spliterator(), false) - .map(DataSegment::getDataSource).collect(Collectors.toSet()) + .map(SegmentId::getDataSource).collect(Collectors.toSet()) ); final MultipleSpecificSegmentSpec querySegmentSpec = new MultipleSpecificSegmentSpec( StreamSupport.stream(segments.spliterator(), false) - .map(DataSegment::toDescriptor).collect(Collectors.toList()) + .map(SegmentId::toDescriptor).collect(Collectors.toList()) ); final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( @@ -643,11 +654,11 @@ public class DruidSchema extends AbstractSchema return rowSignatureBuilder.build(); } - Map getSegmentMetadata() + Map getSegmentMetadataSnapshot() { - final Map segmentMetadata = new HashMap<>(); + final Map segmentMetadata = new HashMap<>(); synchronized (lock) { - for (TreeMap val : segmentMetadataInfo.values()) { + for (TreeMap val : segmentMetadataInfo.values()) { segmentMetadata.putAll(val); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index f863104ff85..f80ce7376e5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -255,8 +255,9 @@ public class SystemSchema extends AbstractSchema public Enumerable scan(DataContext root) { //get available segments from druidSchema - final Map availableSegmentMetadata = druidSchema.getSegmentMetadata(); - final Iterator> availableSegmentEntries = + final Map availableSegmentMetadata = + druidSchema.getSegmentMetadataSnapshot(); + final Iterator> availableSegmentEntries = availableSegmentMetadata.entrySet().iterator(); // in memory map to store segment data from available segments @@ -265,10 +266,11 @@ public class SystemSchema extends AbstractSchema for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); - partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); + partialSegmentDataMap.put(h.getSegment().getId(), partialSegmentData); } - //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator + // get published segments from metadata segment cache (if enabled in sql planner config), else directly from + // coordinator final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -319,19 +321,19 @@ public class SystemSchema extends AbstractSchema )) .transform(val -> { try { - if (segmentsAlreadySeen.contains(val.getKey().getId())) { + if (segmentsAlreadySeen.contains(val.getKey())) { return null; } - final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getId()); + final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey()); final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); return new Object[]{ - val.getKey().getId(), + val.getKey(), val.getKey().getDataSource(), val.getKey().getInterval().getStart().toString(), val.getKey().getInterval().getEnd().toString(), - val.getKey().getSize(), + val.getValue().getSegment().getSize(), val.getKey().getVersion(), - Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), + (long) val.getValue().getSegment().getShardSpec().getPartitionNum(), numReplicas, val.getValue().getNumRows(), IS_PUBLISHED_FALSE, // is_published is false for unpublished segments @@ -342,7 +344,7 @@ public class SystemSchema extends AbstractSchema }; } catch (JsonProcessingException e) { - throw new RE(e, "Error getting segment payload for segment %s", val.getKey().getId()); + throw new RE(e, "Error getting segment payload for segment %s", val.getKey()); } }); @@ -371,18 +373,18 @@ public class SystemSchema extends AbstractSchema return authorizedSegments.iterator(); } - private Iterator> getAuthorizedAvailableSegments( - Iterator> availableSegmentEntries, + private Iterator> getAuthorizedAvailableSegments( + Iterator> availableSegmentEntries, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - Function, Iterable> raGenerator = segment -> Collections + Function, Iterable> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource())); - final Iterable> authorizedSegments = + final Iterable> authorizedSegments = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> availableSegmentEntries, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index d607587db4a..dcabfbae3f0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -52,6 +52,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.junit.After; import org.junit.AfterClass; @@ -66,7 +67,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.stream.Collectors; public class DruidSchemaTest extends CalciteTestBase { @@ -254,8 +255,11 @@ public class DruidSchemaTest extends CalciteTestBase @Test public void testSegmentMetadataHolderNumRows() { - Map segmentsMetadata = schema.getSegmentMetadata(); - final Set segments = segmentsMetadata.keySet(); + Map segmentsMetadata = schema.getSegmentMetadataSnapshot(); + final List segments = segmentsMetadata.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); Assert.assertEquals(3, segments.size()); // find the only segment with datasource "foo2" final DataSegment existingSegment = segments.stream() @@ -263,16 +267,16 @@ public class DruidSchemaTest extends CalciteTestBase .findFirst() .orElse(null); Assert.assertNotNull(existingSegment); - final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment); + final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment.getId()); // update AvailableSegmentMetadata of existingSegment with numRows=5 AvailableSegmentMetadata updatedMetadata = AvailableSegmentMetadata.from(existingMetadata).withNumRows(5).build(); - schema.setAvailableSegmentMetadata(existingSegment, updatedMetadata); + schema.setAvailableSegmentMetadata(existingSegment.getId(), updatedMetadata); // find a druidServer holding existingSegment final Pair pair = druidServers .stream() .flatMap(druidServer -> druidServer .getLazyAllSegments().stream() - .filter(segment -> segment.equals(existingSegment)) + .filter(segment -> segment.getId().equals(existingSegment.getId())) .map(segment -> Pair.of(druidServer, segment)) ) .findAny() @@ -283,14 +287,14 @@ public class DruidSchemaTest extends CalciteTestBase final DruidServerMetadata druidServerMetadata = server.getMetadata(); // invoke DruidSchema#addSegment on existingSegment schema.addSegment(druidServerMetadata, existingSegment); - segmentsMetadata = schema.getSegmentMetadata(); + segmentsMetadata = schema.getSegmentMetadataSnapshot(); // get the only segment with datasource "foo2" final DataSegment currentSegment = segments.stream() .filter(segment -> segment.getDataSource().equals("foo2")) .findFirst() .orElse(null); - final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment); - Assert.assertEquals(updatedMetadata.getSegmentId(), currentMetadata.getSegmentId()); + final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment.getId()); + Assert.assertEquals(updatedMetadata.getSegment().getId(), currentMetadata.getSegment().getId()); Assert.assertEquals(updatedMetadata.getNumRows(), currentMetadata.getNumRows()); // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before Assert.assertEquals(updatedMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); @@ -299,8 +303,11 @@ public class DruidSchemaTest extends CalciteTestBase @Test public void testNullDatasource() throws IOException { - Map segmentMetadatas = schema.getSegmentMetadata(); - Set segments = segmentMetadatas.keySet(); + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + final List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); Assert.assertEquals(segments.size(), 3); // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" @@ -308,31 +315,34 @@ public class DruidSchemaTest extends CalciteTestBase .filter(segment -> segment.getDataSource().equals("foo2")) .findFirst() .orElse(null); - Assert.assertFalse(segmentToRemove == null); + Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource - segmentMetadatas = schema.getSegmentMetadata(); - segments = segmentMetadatas.keySet(); - Assert.assertEquals(segments.size(), 2); + + // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); + Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); } @Test public void testNullAvailableSegmentMetadata() throws IOException { - Map segmentMetadatas = schema.getSegmentMetadata(); - Set segments = segmentMetadatas.keySet(); + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + final List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); Assert.assertEquals(segments.size(), 3); // remove one of the segments with datasource "foo" final DataSegment segmentToRemove = segments.stream() .filter(segment -> segment.getDataSource().equals("foo")) .findFirst() .orElse(null); - Assert.assertFalse(segmentToRemove == null); + Assert.assertNotNull(segmentToRemove); schema.removeSegment(segmentToRemove); - schema.refreshSegments(segments); // can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource - segmentMetadatas = schema.getSegmentMetadata(); - segments = segmentMetadatas.keySet(); - Assert.assertEquals(segments.size(), 2); + + // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource + schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); + Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); } }