Set is_available to false by default for published segment (#6757)

* Set is_available to false by default for published segment

* Address comments

Fix the is_published value for segments not in metadata store

* Remove unused import

* Use non-null sharSpec for a segment in test

* Fix checkstyle

* Modify comment
This commit is contained in:
Surekha 2018-12-20 13:29:00 -08:00 committed by Gian Merlino
parent 6fbf3d635b
commit 5e5aad49e6
4 changed files with 189 additions and 26 deletions

View File

@ -50,7 +50,6 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -339,10 +338,9 @@ public class DruidSchema extends AbstractSchema
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0;
final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
segment.getIdentifier(),
isPublished,
0,
1,
isRealtime,
1

View File

@ -250,7 +250,7 @@ public class SystemSchema extends AbstractSchema
try {
segmentsAlreadySeen.add(val.getIdentifier());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();

View File

@ -73,6 +73,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -203,7 +204,7 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
new TestServerInventoryView(walker.getSegments(), realtimeSegments),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@ -251,7 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
null,
new NumberedShardSpec(2, 3),
1,
100L,
DataSegment.PruneLoadSpecHolder.DEFAULT
@ -281,6 +282,8 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT
);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L,
@ -329,11 +332,6 @@ public class SystemSchemaTest extends CalciteTestBase
@Test
public void testSegmentsTable() throws Exception
{
// total segments = 6
// segments 1,2,3 are published and available
// segments 4,5,6 are published but unavailable
// segment 3 is published but not served
// segment 2 is served by 2 servers, so num_replicas=2
final SystemSchema.SegmentsTable segmentsTable = EasyMock
.createMockBuilder(SystemSchema.SegmentsTable.class)
@ -356,9 +354,9 @@ public class SystemSchemaTest extends CalciteTestBase
.anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
//published but unavailable segments
//segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2
final String json = "[{\n"
+ "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ "\t\"dataSource\": \"wikipedia1\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@ -376,7 +374,7 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"size\": 47406,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
+ "}, {\n"
+ "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ "\t\"dataSource\": \"wikipedia2\",\n"
+ "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
+ "\t\"loadSpec\": {\n"
@ -394,7 +392,7 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"size\": 83846,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
+ "}, {\n"
+ "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ "\t\"dataSource\": \"wikipedia3\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@ -411,6 +409,34 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"binaryVersion\": 9,\n"
+ "\t\"size\": 53527,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+ "}, {\n"
+ "\t\"dataSource\": \"test1\",\n"
+ "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
+ "\t\"version\": \"version1\",\n"
+ "\t\"loadSpec\": null,\n"
+ "\t\"dimensions\": \"dim1,dim2\",\n"
+ "\t\"metrics\": \"met1,met2\",\n"
+ "\t\"shardSpec\": {\n"
+ "\t\t\"type\": \"none\",\n"
+ "\t\t\"domainDimensions\": []\n"
+ "\t},\n"
+ "\t\"binaryVersion\": 1,\n"
+ "\t\"size\": 100,\n"
+ "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
+ "}, {\n"
+ "\t\"dataSource\": \"test2\",\n"
+ "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
+ "\t\"version\": \"version2\",\n"
+ "\t\"loadSpec\": null,\n"
+ "\t\"dimensions\": \"dim1,dim2\",\n"
+ "\t\"metrics\": \"met1,met2\",\n"
+ "\t\"shardSpec\": {\n"
+ "\t\t\"type\": \"none\",\n"
+ "\t\t\"domainDimensions\": []\n"
+ "\t},\n"
+ "\t\"binaryVersion\": 1,\n"
+ "\t\"size\": 100,\n"
+ "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
+ "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite);
@ -447,19 +473,137 @@ public class SystemSchemaTest extends CalciteTestBase
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
Assert.assertEquals(6, rows.size());
// total segments = 8
// segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable
// segments test1, test2 are published and available
// segment test3 is served by historical but unpublished or unused
// segments test4, test5 are not published but available (realtime segments)
Assert.assertEquals(8, rows.size());
Object[] row0 = rows.get(0);
//segment 6 is published and unavailable, num_replicas is 0
Assert.assertEquals(1L, row0[9]);
Assert.assertEquals(0L, row0[7]);
//segment 0 is published and unavailable, num_replicas is 0
Assert.assertEquals(
"wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
row0[0]
);
Assert.assertEquals("wikipedia1", row0[1]);
Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]);
Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]);
Assert.assertEquals(47406L, row0[4]);
Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]);
Assert.assertEquals(0L, row0[6]); //partition_num
Assert.assertEquals(0L, row0[7]); //num_replicas
Assert.assertEquals(0L, row0[8]); //numRows = 0
Assert.assertEquals(1L, row0[9]); //is_published
Assert.assertEquals(0L, row0[10]); //is_available
Assert.assertEquals(0L, row0[11]); //is_realtime
Object[] row1 = rows.get(1);
Assert.assertEquals(
"wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
row1[0]
);
Assert.assertEquals("wikipedia2", row1[1]);
Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]);
Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]);
Assert.assertEquals(83846L, row1[4]);
Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]);
Assert.assertEquals(0L, row1[6]); //partition_num
Assert.assertEquals(0L, row1[7]); //num_replicas
Assert.assertEquals(0L, row1[8]); //numRows = 0
Assert.assertEquals(1L, row1[9]); //is_published
Assert.assertEquals(0L, row1[10]); //is_available
Assert.assertEquals(0L, row1[11]); //is_realtime
Object[] row2 = rows.get(2);
Assert.assertEquals(
"wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
row2[0]
);
Assert.assertEquals("wikipedia3", row2[1]);
Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]);
Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]);
Assert.assertEquals(53527L, row2[4]);
Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]);
Assert.assertEquals(0L, row2[6]); //partition_num
Assert.assertEquals(0L, row2[7]); //num_replicas
Assert.assertEquals(0L, row2[8]); //numRows = 0
Assert.assertEquals(1L, row2[9]); //is_published
Assert.assertEquals(0L, row2[10]); //is_available
Assert.assertEquals(0L, row2[11]); //is_realtime
Object[] row3 = rows.get(3);
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]);
Assert.assertEquals("test1", row3[1]);
Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]);
Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]);
Assert.assertEquals(100L, row3[4]);
Assert.assertEquals("version1", row3[5]);
Assert.assertEquals(0L, row3[6]); //partition_num
Assert.assertEquals(1L, row3[7]); //num_replicas
Assert.assertEquals(3L, row3[8]); //numRows = 3
Assert.assertEquals(1L, row3[9]); //is_published
Assert.assertEquals(1L, row3[10]); //is_available
Assert.assertEquals(0L, row3[11]); //is_realtime
Object[] row4 = rows.get(4);
//segment 2 is published and has 2 replicas
Assert.assertEquals(1L, row4[9]);
Assert.assertEquals(2L, row4[7]);
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]);
Assert.assertEquals("test2", row4[1]);
Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]);
Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]);
Assert.assertEquals(100L, row4[4]);
Assert.assertEquals("version2", row4[5]);
Assert.assertEquals(0L, row4[6]); //partition_num
Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
Assert.assertEquals(3L, row4[8]); //numRows = 3
Assert.assertEquals(1L, row4[9]); //is_published
Assert.assertEquals(1L, row4[10]); //is_available
Assert.assertEquals(0L, row4[11]); //is_realtime
Object[] row5 = rows.get(5);
//segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]);
Assert.assertEquals("test3", row5[1]);
Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]);
Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]);
Assert.assertEquals(100L, row5[4]);
Assert.assertEquals("version3", row5[5]);
Assert.assertEquals(2L, row5[6]); //partition_num = 2
Assert.assertEquals(1L, row5[7]); //num_replicas
Assert.assertEquals(3L, row5[8]); //numRows = 3
Assert.assertEquals(0L, row5[9]); //is_published
Assert.assertEquals(1L, row5[10]); //is_available
Assert.assertEquals(0L, row5[11]); //is_realtime
Object[] row6 = rows.get(6);
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]);
Assert.assertEquals("test5", row6[1]);
Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]);
Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]);
Assert.assertEquals(100L, row6[4]);
Assert.assertEquals("version5", row6[5]);
Assert.assertEquals(0L, row6[6]); //partition_num
Assert.assertEquals(1L, row6[7]); //num_replicas
Assert.assertEquals(0L, row6[8]); //numRows = 0
Assert.assertEquals(0L, row6[9]); //is_published
Assert.assertEquals(1L, row6[10]); //is_available
Assert.assertEquals(1L, row6[11]); //is_realtime
Object[] row7 = rows.get(7);
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]);
Assert.assertEquals("test4", row7[1]);
Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]);
Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]);
Assert.assertEquals(100L, row7[4]);
Assert.assertEquals("version4", row7[5]);
Assert.assertEquals(0L, row7[6]); //partition_num
Assert.assertEquals(1L, row7[7]); //num_replicas
Assert.assertEquals(0L, row7[8]); //numRows
Assert.assertEquals(0L, row7[9]); //is_published
Assert.assertEquals(1L, row7[10]); //is_available
Assert.assertEquals(1L, row7[11]); //is_realtime
// Verify value types.
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
@ -576,7 +720,7 @@ public class SystemSchemaTest extends CalciteTestBase
Object[] row2 = rows.get(2);
Assert.assertEquals("server2:1234", row2[0]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]);
Object[] row3 = rows.get(3);
Assert.assertEquals("server2:1234", row3[0]);

View File

@ -32,6 +32,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@ -49,13 +50,29 @@ public class TestServerInventoryView implements TimelineServerView
"dummy",
0
);
private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
"dummy",
"dummy",
null,
0,
ServerType.REALTIME,
"dummy",
0
);
private final List<DataSegment> segments;
private List<DataSegment> realtimeSegments = new ArrayList<>();
public TestServerInventoryView(List<DataSegment> segments)
{
this.segments = ImmutableList.copyOf(segments);
}
public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
{
this.segments = ImmutableList.copyOf(segments);
this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
}
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
@ -75,7 +92,9 @@ public class TestServerInventoryView implements TimelineServerView
for (final DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
for (final DataSegment segment : realtimeSegments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
}
exec.execute(callback::segmentViewInitialized);
}
@ -85,7 +104,9 @@ public class TestServerInventoryView implements TimelineServerView
for (DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
for (final DataSegment segment : realtimeSegments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
}
exec.execute(callback::timelineInitialized);
}