diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 0e7e9734646..87d97ef29e9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -50,7 +50,6 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -339,10 +338,9 @@ public class DruidSchema extends AbstractSchema if (knownSegments == null || !knownSegments.containsKey(segment)) { // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; - final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0; final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder( segment.getIdentifier(), - isPublished, + 0, 1, isRealtime, 1 diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 431f4b1730e..57e8825af46 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -250,7 +250,7 @@ public class SystemSchema extends AbstractSchema try { segmentsAlreadySeen.add(val.getIdentifier()); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier()); - long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L; + long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L; if (partialSegmentData != null) { numReplicas = partialSegmentData.getNumReplicas(); numRows = partialSegmentData.getNumRows(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 9dc5a0e55f3..4020a8897b3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -73,6 +73,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -203,7 +204,7 @@ public class SystemSchemaTest extends CalciteTestBase druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - new TestServerInventoryView(walker.getSegments()), + new TestServerInventoryView(walker.getSegments(), realtimeSegments), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -251,7 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase null, ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), - null, + new NumberedShardSpec(2, 3), 1, 100L, DataSegment.PruneLoadSpecHolder.DEFAULT @@ -281,6 +282,8 @@ public class SystemSchemaTest extends CalciteTestBase DataSegment.PruneLoadSpecHolder.DEFAULT ); + final List realtimeSegments = ImmutableList.of(segment4, segment5); + private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer( new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), 1L, @@ -329,11 +332,6 @@ public class SystemSchemaTest extends CalciteTestBase @Test public void testSegmentsTable() throws Exception { - // total segments = 6 - // segments 1,2,3 are published and available - // segments 4,5,6 are published but unavailable - // segment 3 is published but not served - // segment 2 is served by 2 servers, so num_replicas=2 final SystemSchema.SegmentsTable segmentsTable = EasyMock .createMockBuilder(SystemSchema.SegmentsTable.class) @@ -356,9 +354,9 @@ public class SystemSchemaTest extends CalciteTestBase .anyTimes(); AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - //published but unavailable segments + //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2 final String json = "[{\n" - + "\t\"dataSource\": \"wikipedia-kafka\",\n" + + "\t\"dataSource\": \"wikipedia1\",\n" + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" + "\t\"loadSpec\": {\n" @@ -376,7 +374,7 @@ public class SystemSchemaTest extends CalciteTestBase + "\t\"size\": 47406,\n" + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n" + "}, {\n" - + "\t\"dataSource\": \"wikipedia-kafka\",\n" + + "\t\"dataSource\": \"wikipedia2\",\n" + "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n" + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n" + "\t\"loadSpec\": {\n" @@ -394,7 +392,7 @@ public class SystemSchemaTest extends CalciteTestBase + "\t\"size\": 83846,\n" + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n" + "}, {\n" - + "\t\"dataSource\": \"wikipedia-kafka\",\n" + + "\t\"dataSource\": \"wikipedia3\",\n" + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" + "\t\"loadSpec\": {\n" @@ -411,6 +409,34 @@ public class SystemSchemaTest extends CalciteTestBase + "\t\"binaryVersion\": 9,\n" + "\t\"size\": 53527,\n" + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n" + + "}, {\n" + + "\t\"dataSource\": \"test1\",\n" + + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n" + + "\t\"version\": \"version1\",\n" + + "\t\"loadSpec\": null,\n" + + "\t\"dimensions\": \"dim1,dim2\",\n" + + "\t\"metrics\": \"met1,met2\",\n" + + "\t\"shardSpec\": {\n" + + "\t\t\"type\": \"none\",\n" + + "\t\t\"domainDimensions\": []\n" + + "\t},\n" + + "\t\"binaryVersion\": 1,\n" + + "\t\"size\": 100,\n" + + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n" + + "}, {\n" + + "\t\"dataSource\": \"test2\",\n" + + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n" + + "\t\"version\": \"version2\",\n" + + "\t\"loadSpec\": null,\n" + + "\t\"dimensions\": \"dim1,dim2\",\n" + + "\t\"metrics\": \"met1,met2\",\n" + + "\t\"shardSpec\": {\n" + + "\t\t\"type\": \"none\",\n" + + "\t\t\"domainDimensions\": []\n" + + "\t},\n" + + "\t\"binaryVersion\": 1,\n" + + "\t\"size\": 100,\n" + + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n" + "}]"; byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); in.add(bytesToWrite); @@ -447,19 +473,137 @@ public class SystemSchemaTest extends CalciteTestBase final List rows = segmentsTable.scan(dataContext).toList(); - Assert.assertEquals(6, rows.size()); + // total segments = 8 + // segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable + // segments test1, test2 are published and available + // segment test3 is served by historical but unpublished or unused + // segments test4, test5 are not published but available (realtime segments) + + Assert.assertEquals(8, rows.size()); Object[] row0 = rows.get(0); - //segment 6 is published and unavailable, num_replicas is 0 - Assert.assertEquals(1L, row0[9]); - Assert.assertEquals(0L, row0[7]); + //segment 0 is published and unavailable, num_replicas is 0 + Assert.assertEquals( + "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", + row0[0] + ); + Assert.assertEquals("wikipedia1", row0[1]); + Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]); + Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]); + Assert.assertEquals(47406L, row0[4]); + Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]); + Assert.assertEquals(0L, row0[6]); //partition_num + Assert.assertEquals(0L, row0[7]); //num_replicas Assert.assertEquals(0L, row0[8]); //numRows = 0 + Assert.assertEquals(1L, row0[9]); //is_published + Assert.assertEquals(0L, row0[10]); //is_available + Assert.assertEquals(0L, row0[11]); //is_realtime + + Object[] row1 = rows.get(1); + Assert.assertEquals( + "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z", + row1[0] + ); + Assert.assertEquals("wikipedia2", row1[1]); + Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]); + Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]); + Assert.assertEquals(83846L, row1[4]); + Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]); + Assert.assertEquals(0L, row1[6]); //partition_num + Assert.assertEquals(0L, row1[7]); //num_replicas + Assert.assertEquals(0L, row1[8]); //numRows = 0 + Assert.assertEquals(1L, row1[9]); //is_published + Assert.assertEquals(0L, row1[10]); //is_available + Assert.assertEquals(0L, row1[11]); //is_realtime + + + Object[] row2 = rows.get(2); + Assert.assertEquals( + "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", + row2[0] + ); + Assert.assertEquals("wikipedia3", row2[1]); + Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]); + Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]); + Assert.assertEquals(53527L, row2[4]); + Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]); + Assert.assertEquals(0L, row2[6]); //partition_num + Assert.assertEquals(0L, row2[7]); //num_replicas + Assert.assertEquals(0L, row2[8]); //numRows = 0 + Assert.assertEquals(1L, row2[9]); //is_published + Assert.assertEquals(0L, row2[10]); //is_available + Assert.assertEquals(0L, row2[11]); //is_realtime + + Object[] row3 = rows.get(3); + Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]); + Assert.assertEquals("test1", row3[1]); + Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]); + Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]); + Assert.assertEquals(100L, row3[4]); + Assert.assertEquals("version1", row3[5]); + Assert.assertEquals(0L, row3[6]); //partition_num + Assert.assertEquals(1L, row3[7]); //num_replicas + Assert.assertEquals(3L, row3[8]); //numRows = 3 + Assert.assertEquals(1L, row3[9]); //is_published + Assert.assertEquals(1L, row3[10]); //is_available + Assert.assertEquals(0L, row3[11]); //is_realtime Object[] row4 = rows.get(4); - //segment 2 is published and has 2 replicas - Assert.assertEquals(1L, row4[9]); - Assert.assertEquals(2L, row4[7]); - Assert.assertEquals(3L, row4[8]); //numRows = 3 + Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]); + Assert.assertEquals("test2", row4[1]); + Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]); + Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]); + Assert.assertEquals(100L, row4[4]); + Assert.assertEquals("version2", row4[5]); + Assert.assertEquals(0L, row4[6]); //partition_num + Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2 + Assert.assertEquals(3L, row4[8]); //numRows = 3 + Assert.assertEquals(1L, row4[9]); //is_published + Assert.assertEquals(1L, row4[10]); //is_available + Assert.assertEquals(0L, row4[11]); //is_realtime + + Object[] row5 = rows.get(5); + //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 + Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]); + Assert.assertEquals("test3", row5[1]); + Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]); + Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]); + Assert.assertEquals(100L, row5[4]); + Assert.assertEquals("version3", row5[5]); + Assert.assertEquals(2L, row5[6]); //partition_num = 2 + Assert.assertEquals(1L, row5[7]); //num_replicas + Assert.assertEquals(3L, row5[8]); //numRows = 3 + Assert.assertEquals(0L, row5[9]); //is_published + Assert.assertEquals(1L, row5[10]); //is_available + Assert.assertEquals(0L, row5[11]); //is_realtime + + Object[] row6 = rows.get(6); + Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]); + Assert.assertEquals("test5", row6[1]); + Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]); + Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]); + Assert.assertEquals(100L, row6[4]); + Assert.assertEquals("version5", row6[5]); + Assert.assertEquals(0L, row6[6]); //partition_num + Assert.assertEquals(1L, row6[7]); //num_replicas + Assert.assertEquals(0L, row6[8]); //numRows = 0 + Assert.assertEquals(0L, row6[9]); //is_published + Assert.assertEquals(1L, row6[10]); //is_available + Assert.assertEquals(1L, row6[11]); //is_realtime + + Object[] row7 = rows.get(7); + Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]); + Assert.assertEquals("test4", row7[1]); + Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]); + Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]); + Assert.assertEquals(100L, row7[4]); + Assert.assertEquals("version4", row7[5]); + Assert.assertEquals(0L, row7[6]); //partition_num + Assert.assertEquals(1L, row7[7]); //num_replicas + Assert.assertEquals(0L, row7[8]); //numRows + Assert.assertEquals(0L, row7[9]); //is_published + Assert.assertEquals(1L, row7[10]); //is_available + Assert.assertEquals(1L, row7[11]); //is_realtime // Verify value types. verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE); @@ -576,7 +720,7 @@ public class SystemSchemaTest extends CalciteTestBase Object[] row2 = rows.get(2); Assert.assertEquals("server2:1234", row2[0]); - Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]); + Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]); Object[] row3 = rows.get(3); Assert.assertEquals("server2:1234", row3[0]); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 363d595f38d..1b52ba982fa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -32,6 +32,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; @@ -49,13 +50,29 @@ public class TestServerInventoryView implements TimelineServerView "dummy", 0 ); + private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata( + "dummy", + "dummy", + null, + 0, + ServerType.REALTIME, + "dummy", + 0 + ); private final List segments; + private List realtimeSegments = new ArrayList<>(); public TestServerInventoryView(List segments) { this.segments = ImmutableList.copyOf(segments); } + public TestServerInventoryView(List segments, List realtimeSegments) + { + this.segments = ImmutableList.copyOf(segments); + this.realtimeSegments = ImmutableList.copyOf(realtimeSegments); + } + @Override public TimelineLookup getTimeline(DataSource dataSource) { @@ -75,7 +92,9 @@ public class TestServerInventoryView implements TimelineServerView for (final DataSegment segment : segments) { exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment)); } - + for (final DataSegment segment : realtimeSegments) { + exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); + } exec.execute(callback::segmentViewInitialized); } @@ -85,7 +104,9 @@ public class TestServerInventoryView implements TimelineServerView for (DataSegment segment : segments) { exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment)); } - + for (final DataSegment segment : realtimeSegments) { + exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); + } exec.execute(callback::timelineInitialized); }