diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 8dfb0e8ff8a..3678024e44d 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -655,7 +655,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His |num_rows|LONG|Number of rows in current segment, this value could be null if unkown to Broker at query time| |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| -|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| +|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment| |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. |payload|STRING|JSON-serialized data segment payload| diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java index 1dc8e1ed779..4efff1130f1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; @@ -34,7 +35,7 @@ public class AvailableSegmentMetadata public static Builder builder( DataSegment segment, long isRealtime, - Set segmentServers, + Set segmentServers, RowSignature rowSignature, long numRows ) @@ -58,7 +59,7 @@ public class AvailableSegmentMetadata // to make it easy to count number of segments which are realtime private final long isRealtime; // set of servers that contain the segment - private final Set segmentServers; + private final Set segmentServers; private final long numRows; @Nullable private final RowSignature rowSignature; @@ -82,7 +83,7 @@ public class AvailableSegmentMetadata return segment; } - public Set getReplicas() + public Set getReplicas() { return segmentServers; } @@ -106,9 +107,9 @@ public class AvailableSegmentMetadata public static class Builder { private final DataSegment segment; - private final long isRealtime; - private Set segmentServers; + private long isRealtime; + private Set segmentServers; @Nullable private RowSignature rowSignature; private long numRows; @@ -116,7 +117,7 @@ public class AvailableSegmentMetadata private Builder( DataSegment segment, long isRealtime, - Set servers, + Set servers, @Nullable RowSignature rowSignature, long numRows ) @@ -140,12 +141,18 @@ public class AvailableSegmentMetadata return this; } - public Builder withReplicas(Set servers) + public Builder withReplicas(Set servers) { this.segmentServers = servers; return this; } + public Builder withRealtime(long isRealtime) + { + this.isRealtime = isRealtime; + return this; + } + public AvailableSegmentMetadata build() { return new AvailableSegmentMetadata(this); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 2116fbb38d6..55a9a7911c8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -57,6 +57,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -73,6 +74,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -359,14 +361,12 @@ public class DruidSchema extends AbstractSchema final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; if (segmentMetadata == null) { - // segmentReplicatable is used to determine if segments are served by realtime servers or not - final long isRealtime = server.segmentReplicatable() ? 0 : 1; - - final Set servers = ImmutableSet.of(server.getName()); + // segmentReplicatable is used to determine if segments are served by historical or realtime servers + long isRealtime = server.segmentReplicatable() ? 0 : 1; segmentMetadata = AvailableSegmentMetadata.builder( segment, isRealtime, - servers, + ImmutableSet.of(server), null, DEFAULT_NUM_ROWS ).build(); @@ -380,14 +380,15 @@ public class DruidSchema extends AbstractSchema log.debug("Added new immutable segment[%s].", segment.getId()); } } else { - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = new ImmutableSet.Builder() + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = new ImmutableSet.Builder() .addAll(segmentServers) - .add(server.getName()) + .add(server) .build(); final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata .from(segmentMetadata) .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) .build(); knownSegments.put(segment.getId(), metadataWithNumReplicas); if (server.segmentReplicatable()) { @@ -431,19 +432,23 @@ public class DruidSchema extends AbstractSchema } } - private void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) + @VisibleForTesting + void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = FluentIterable.from(segmentServers) - .filter(Predicates.not(Predicates.equalTo(server.getName()))) - .toSet(); + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = FluentIterable + .from(segmentServers) + .filter(Predicates.not(Predicates.equalTo(server))) + .toSet(); + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata .from(segmentMetadata) .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) .build(); knownSegments.put(segment.getId(), metadataWithNumReplicas); lock.notifyAll(); @@ -475,6 +480,18 @@ public class DruidSchema extends AbstractSchema return retVal; } + private long recomputeIsRealtime(ImmutableSet servers) + { + final Optional historicalServer = servers + .stream() + .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) + .findAny(); + + // if there is any historical server in the replicas, isRealtime flag should be unset + final long isRealtime = historicalServer.isPresent() ? 0 : 1; + return isRealtime; + } + /** * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * segments actually refreshed, which may be a subset of the asked-for set. diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 08fcb7ebe60..1b2573e8583 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.DruidTable; @@ -54,6 +55,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -168,8 +170,20 @@ public class DruidSchemaTest extends CalciteTestBase .build(), index2 ); - - final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments()); + final DataSegment segment1 = new DataSegment( + "foo3", + Intervals.of("2012/2013"), + "version3", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + new NumberedShardSpec(2, 3), + 1, + 100L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + final List realtimeSegments = ImmutableList.of(segment1); + final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); schema = new DruidSchema( @@ -253,14 +267,14 @@ public class DruidSchemaTest extends CalciteTestBase * is called more than once for same segment */ @Test - public void testSegmentMetadataHolderNumRows() + public void testAvailableSegmentMetadataNumRows() { Map segmentsMetadata = schema.getSegmentMetadataSnapshot(); final List segments = segmentsMetadata.values() .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(3, segments.size()); + Assert.assertEquals(4, segments.size()); // find the only segment with datasource "foo2" final DataSegment existingSegment = segments.stream() .filter(segment -> segment.getDataSource().equals("foo2")) @@ -309,7 +323,7 @@ public class DruidSchemaTest extends CalciteTestBase .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(segments.size(), 3); + Assert.assertEquals(4, segments.size()); // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" final DataSegment segmentToRemove = segments.stream() @@ -321,7 +335,7 @@ public class DruidSchemaTest extends CalciteTestBase // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); - Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); + Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); } @Test @@ -332,7 +346,7 @@ public class DruidSchemaTest extends CalciteTestBase .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); - Assert.assertEquals(segments.size(), 3); + Assert.assertEquals(4, segments.size()); // remove one of the segments with datasource "foo" final DataSegment segmentToRemove = segments.stream() .filter(segment -> segment.getDataSource().equals("foo")) @@ -343,7 +357,61 @@ public class DruidSchemaTest extends CalciteTestBase // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); - Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); + Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size()); + } + + @Test + public void testAvailableSegmentMetadataIsRealtime() + { + Map segmentsMetadata = schema.getSegmentMetadataSnapshot(); + final List segments = segmentsMetadata.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + // find the only realtime segment with datasource "foo3" + final DataSegment existingSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo3")) + .findFirst() + .orElse(null); + Assert.assertNotNull(existingSegment); + final AvailableSegmentMetadata metadata = segmentsMetadata.get(existingSegment.getId()); + Assert.assertEquals(1L, metadata.isRealtime()); + // get the historical server + final ImmutableDruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + // add existingSegment to historical + schema.addSegment(historicalServerMetadata, existingSegment); + segmentsMetadata = schema.getSegmentMetadataSnapshot(); + // get the segment with datasource "foo3" + DataSegment currentSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo3")) + .findFirst() + .orElse(null); + Assert.assertNotNull(currentSegment); + AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment.getId()); + Assert.assertEquals(0L, currentMetadata.isRealtime()); + + ImmutableDruidServer realtimeServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.REALTIME)) + .findAny() + .orElse(null); + Assert.assertNotNull(realtimeServer); + // drop existingSegment from realtime task + schema.removeServerSegment(realtimeServer.getMetadata(), existingSegment); + segmentsMetadata = schema.getSegmentMetadataSnapshot(); + currentSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo3")) + .findFirst() + .orElse(null); + Assert.assertNotNull(currentSegment); + currentMetadata = segmentsMetadata.get(currentSegment.getId()); + Assert.assertEquals(0L, currentMetadata.isRealtime()); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 2dcc5695983..0f36273035a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -93,7 +93,18 @@ public class TestServerInventoryView implements TimelineServerView ImmutableMap.of("src", dataSource), 1 ); - return ImmutableList.of(server); + final ImmutableDruidDataSource dataSource2 = new ImmutableDruidDataSource( + "DUMMY2", + Collections.emptyMap(), + realtimeSegments + ); + final ImmutableDruidServer realtimeServer = new ImmutableDruidServer( + DUMMY_SERVER_REALTIME, + 0L, + ImmutableMap.of("src", dataSource2), + 1 + ); + return ImmutableList.of(server, realtimeServer); } @Override