Fix `is_realtime` column behavior in sys.segments table (#8154)

* Fix is_realtime flag

* make variable final

* minor changes

* Modify is_realtime behavior based on review comment

* Fix UT
This commit is contained in:
Surekha 2019-07-31 21:26:49 -07:00 committed by David Lim
parent 41893d4647
commit f0ecdfee30
5 changed files with 133 additions and 30 deletions

View File

@ -655,7 +655,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His
|num_rows|LONG|Number of rows in current segment, this value could be null if unkown to Broker at query time| |num_rows|LONG|Number of rows in current segment, this value could be null if unkown to Broker at query time|
|is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`|
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)|
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet.
|payload|STRING|JSON-serialized data segment payload| |payload|STRING|JSON-serialized data segment payload|

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema; package org.apache.druid.sql.calcite.schema;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -34,7 +35,7 @@ public class AvailableSegmentMetadata
public static Builder builder( public static Builder builder(
DataSegment segment, DataSegment segment,
long isRealtime, long isRealtime,
Set<String> segmentServers, Set<DruidServerMetadata> segmentServers,
RowSignature rowSignature, RowSignature rowSignature,
long numRows long numRows
) )
@ -58,7 +59,7 @@ public class AvailableSegmentMetadata
// to make it easy to count number of segments which are realtime // to make it easy to count number of segments which are realtime
private final long isRealtime; private final long isRealtime;
// set of servers that contain the segment // set of servers that contain the segment
private final Set<String> segmentServers; private final Set<DruidServerMetadata> segmentServers;
private final long numRows; private final long numRows;
@Nullable @Nullable
private final RowSignature rowSignature; private final RowSignature rowSignature;
@ -82,7 +83,7 @@ public class AvailableSegmentMetadata
return segment; return segment;
} }
public Set<String> getReplicas() public Set<DruidServerMetadata> getReplicas()
{ {
return segmentServers; return segmentServers;
} }
@ -106,9 +107,9 @@ public class AvailableSegmentMetadata
public static class Builder public static class Builder
{ {
private final DataSegment segment; private final DataSegment segment;
private final long isRealtime;
private Set<String> segmentServers; private long isRealtime;
private Set<DruidServerMetadata> segmentServers;
@Nullable @Nullable
private RowSignature rowSignature; private RowSignature rowSignature;
private long numRows; private long numRows;
@ -116,7 +117,7 @@ public class AvailableSegmentMetadata
private Builder( private Builder(
DataSegment segment, DataSegment segment,
long isRealtime, long isRealtime,
Set<String> servers, Set<DruidServerMetadata> servers,
@Nullable RowSignature rowSignature, @Nullable RowSignature rowSignature,
long numRows long numRows
) )
@ -140,12 +141,18 @@ public class AvailableSegmentMetadata
return this; return this;
} }
public Builder withReplicas(Set<String> servers) public Builder withReplicas(Set<DruidServerMetadata> servers)
{ {
this.segmentServers = servers; this.segmentServers = servers;
return this; return this;
} }
public Builder withRealtime(long isRealtime)
{
this.isRealtime = isRealtime;
return this;
}
public AvailableSegmentMetadata build() public AvailableSegmentMetadata build()
{ {
return new AvailableSegmentMetadata(this); return new AvailableSegmentMetadata(this);

View File

@ -57,6 +57,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator; import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -73,6 +74,7 @@ import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
@ -359,14 +361,12 @@ public class DruidSchema extends AbstractSchema
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) { if (segmentMetadata == null) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not // segmentReplicatable is used to determine if segments are served by historical or realtime servers
final long isRealtime = server.segmentReplicatable() ? 0 : 1; long isRealtime = server.segmentReplicatable() ? 0 : 1;
final Set<String> servers = ImmutableSet.of(server.getName());
segmentMetadata = AvailableSegmentMetadata.builder( segmentMetadata = AvailableSegmentMetadata.builder(
segment, segment,
isRealtime, isRealtime,
servers, ImmutableSet.of(server),
null, null,
DEFAULT_NUM_ROWS DEFAULT_NUM_ROWS
).build(); ).build();
@ -380,14 +380,15 @@ public class DruidSchema extends AbstractSchema
log.debug("Added new immutable segment[%s].", segment.getId()); log.debug("Added new immutable segment[%s].", segment.getId());
} }
} else { } else {
final Set<String> segmentServers = segmentMetadata.getReplicas(); final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>() final ImmutableSet<DruidServerMetadata> servers = new ImmutableSet.Builder<DruidServerMetadata>()
.addAll(segmentServers) .addAll(segmentServers)
.add(server.getName()) .add(server)
.build(); .build();
final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata
.from(segmentMetadata) .from(segmentMetadata)
.withReplicas(servers) .withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build(); .build();
knownSegments.put(segment.getId(), metadataWithNumReplicas); knownSegments.put(segment.getId(), metadataWithNumReplicas);
if (server.segmentReplicatable()) { if (server.segmentReplicatable()) {
@ -431,19 +432,23 @@ public class DruidSchema extends AbstractSchema
} }
} }
private void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) @VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
synchronized (lock) { synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId());
final Set<String> segmentServers = segmentMetadata.getReplicas(); final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServers) final ImmutableSet<DruidServerMetadata> servers = FluentIterable
.filter(Predicates.not(Predicates.equalTo(server.getName()))) .from(segmentServers)
.toSet(); .filter(Predicates.not(Predicates.equalTo(server)))
.toSet();
final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata
.from(segmentMetadata) .from(segmentMetadata)
.withReplicas(servers) .withReplicas(servers)
.withRealtime(recomputeIsRealtime(servers))
.build(); .build();
knownSegments.put(segment.getId(), metadataWithNumReplicas); knownSegments.put(segment.getId(), metadataWithNumReplicas);
lock.notifyAll(); lock.notifyAll();
@ -475,6 +480,18 @@ public class DruidSchema extends AbstractSchema
return retVal; return retVal;
} }
private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
{
final Optional<DruidServerMetadata> historicalServer = servers
.stream()
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
.findAny();
// if there is any historical server in the replicas, isRealtime flag should be unset
final long isRealtime = historicalServer.isPresent() ? 0 : 1;
return isRealtime;
}
/** /**
* Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set. * segments actually refreshed, which may be a subset of the asked-for set.

View File

@ -43,6 +43,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
@ -54,6 +55,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -168,8 +170,20 @@ public class DruidSchemaTest extends CalciteTestBase
.build(), .build(),
index2 index2
); );
final DataSegment segment1 = new DataSegment(
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments()); "foo3",
Intervals.of("2012/2013"),
"version3",
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
new NumberedShardSpec(2, 3),
1,
100L,
DataSegment.PruneLoadSpecHolder.DEFAULT
);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment1);
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
druidServers = serverView.getDruidServers(); druidServers = serverView.getDruidServers();
schema = new DruidSchema( schema = new DruidSchema(
@ -253,14 +267,14 @@ public class DruidSchemaTest extends CalciteTestBase
* is called more than once for same segment * is called more than once for same segment
*/ */
@Test @Test
public void testSegmentMetadataHolderNumRows() public void testAvailableSegmentMetadataNumRows()
{ {
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot(); Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentsMetadata.values() final List<DataSegment> segments = segmentsMetadata.values()
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(3, segments.size()); Assert.assertEquals(4, segments.size());
// find the only segment with datasource "foo2" // find the only segment with datasource "foo2"
final DataSegment existingSegment = segments.stream() final DataSegment existingSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2")) .filter(segment -> segment.getDataSource().equals("foo2"))
@ -309,7 +323,7 @@ public class DruidSchemaTest extends CalciteTestBase
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(segments.size(), 3); Assert.assertEquals(4, segments.size());
// segments contains two segments with datasource "foo" and one with datasource "foo2" // segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2" // let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream() final DataSegment segmentToRemove = segments.stream()
@ -321,7 +335,7 @@ public class DruidSchemaTest extends CalciteTestBase
// The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
} }
@Test @Test
@ -332,7 +346,7 @@ public class DruidSchemaTest extends CalciteTestBase
.stream() .stream()
.map(AvailableSegmentMetadata::getSegment) .map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList()); .collect(Collectors.toList());
Assert.assertEquals(segments.size(), 3); Assert.assertEquals(4, segments.size());
// remove one of the segments with datasource "foo" // remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream() final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo")) .filter(segment -> segment.getDataSource().equals("foo"))
@ -343,7 +357,61 @@ public class DruidSchemaTest extends CalciteTestBase
// The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource // The following line can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet())); schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2); Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
}
@Test
public void testAvailableSegmentMetadataIsRealtime()
{
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentsMetadata.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
// find the only realtime segment with datasource "foo3"
final DataSegment existingSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo3"))
.findFirst()
.orElse(null);
Assert.assertNotNull(existingSegment);
final AvailableSegmentMetadata metadata = segmentsMetadata.get(existingSegment.getId());
Assert.assertEquals(1L, metadata.isRealtime());
// get the historical server
final ImmutableDruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);
Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
// add existingSegment to historical
schema.addSegment(historicalServerMetadata, existingSegment);
segmentsMetadata = schema.getSegmentMetadataSnapshot();
// get the segment with datasource "foo3"
DataSegment currentSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo3"))
.findFirst()
.orElse(null);
Assert.assertNotNull(currentSegment);
AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment.getId());
Assert.assertEquals(0L, currentMetadata.isRealtime());
ImmutableDruidServer realtimeServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.REALTIME))
.findAny()
.orElse(null);
Assert.assertNotNull(realtimeServer);
// drop existingSegment from realtime task
schema.removeServerSegment(realtimeServer.getMetadata(), existingSegment);
segmentsMetadata = schema.getSegmentMetadataSnapshot();
currentSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo3"))
.findFirst()
.orElse(null);
Assert.assertNotNull(currentSegment);
currentMetadata = segmentsMetadata.get(currentSegment.getId());
Assert.assertEquals(0L, currentMetadata.isRealtime());
} }
} }

View File

@ -93,7 +93,18 @@ public class TestServerInventoryView implements TimelineServerView
ImmutableMap.of("src", dataSource), ImmutableMap.of("src", dataSource),
1 1
); );
return ImmutableList.of(server); final ImmutableDruidDataSource dataSource2 = new ImmutableDruidDataSource(
"DUMMY2",
Collections.emptyMap(),
realtimeSegments
);
final ImmutableDruidServer realtimeServer = new ImmutableDruidServer(
DUMMY_SERVER_REALTIME,
0L,
ImmutableMap.of("src", dataSource2),
1
);
return ImmutableList.of(server, realtimeServer);
} }
@Override @Override