Add check for nullable numRows (#6460)

* Add check for nullable numRows

* Make numRows long instead of Long type

* Add check for numRows in unit test
* small refactoring

* Modify test

PR comment from https://github.com/apache/incubator-druid/pull/6094#pullrequestreview-163937783

* Add a test for serverSegments table

* update tests
This commit is contained in:
Surekha 2018-10-13 15:08:42 -07:00 committed by Jonathan Wei
parent b06ac54a5e
commit e908fd6db7
3 changed files with 100 additions and 41 deletions

View File

@ -36,10 +36,9 @@ public class SegmentMetadataHolder
private final long isRealtime; private final long isRealtime;
private final String segmentId; private final String segmentId;
private final long numReplicas; private final long numReplicas;
private final long numRows;
@Nullable @Nullable
private final RowSignature rowSignature; private final RowSignature rowSignature;
@Nullable
private final Long numRows;
private SegmentMetadataHolder(Builder builder) private SegmentMetadataHolder(Builder builder)
{ {
@ -77,8 +76,7 @@ public class SegmentMetadataHolder
return numReplicas; return numReplicas;
} }
@Nullable public long getNumRows()
public Long getNumRows()
{ {
return numRows; return numRows;
} }
@ -99,8 +97,7 @@ public class SegmentMetadataHolder
private long numReplicas; private long numReplicas;
@Nullable @Nullable
private RowSignature rowSignature; private RowSignature rowSignature;
@Nullable private long numRows;
private Long numRows;
public Builder( public Builder(
String segmentId, String segmentId,
@ -123,7 +120,7 @@ public class SegmentMetadataHolder
return this; return this;
} }
public Builder withNumRows(Long numRows) public Builder withNumRows(long numRows)
{ {
this.numRows = numRows; this.numRows = numRows;
return this; return this;

View File

@ -235,6 +235,13 @@ public class SystemSchema extends AbstractSchema
try { try {
segmentsAlreadySeen.add(val.getIdentifier()); segmentsAlreadySeen.add(val.getIdentifier());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier()); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();
isAvailable = partialSegmentData.isAvailable();
isRealtime = partialSegmentData.isRealtime();
}
return new Object[]{ return new Object[]{
val.getIdentifier(), val.getIdentifier(),
val.getDataSource(), val.getDataSource(),
@ -243,11 +250,11 @@ public class SystemSchema extends AbstractSchema
val.getSize(), val.getSize(),
val.getVersion(), val.getVersion(),
val.getShardSpec().getPartitionNum(), val.getShardSpec().getPartitionNum(),
partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(), numReplicas,
partialSegmentData == null ? 0L : partialSegmentData.getNumRows(), numRows,
1L, //is_published is true for published segments 1L, //is_published is true for published segments
partialSegmentData == null ? 1L : partialSegmentData.isAvailable(), isAvailable,
partialSegmentData == null ? 0L : partialSegmentData.isRealtime(), isRealtime,
jsonMapper.writeValueAsString(val) jsonMapper.writeValueAsString(val)
}; };
} }
@ -340,13 +347,13 @@ public class SystemSchema extends AbstractSchema
private final long isAvailable; private final long isAvailable;
private final long isRealtime; private final long isRealtime;
private final long numReplicas; private final long numReplicas;
private final Long numRows; private final long numRows;
public PartialSegmentData( public PartialSegmentData(
final long isAvailable, final long isAvailable,
final long isRealtime, final long isRealtime,
final long numReplicas, final long numReplicas,
final Long numRows final long numRows
) )
{ {
@ -371,7 +378,7 @@ public class SystemSchema extends AbstractSchema
return numReplicas; return numReplicas;
} }
public Long getNumRows() public long getNumRows()
{ {
return numRows; return numRows;
} }
@ -466,7 +473,7 @@ public class SystemSchema extends AbstractSchema
} }
} }
private static class ServerSegmentsTable extends AbstractTable implements ScannableTable static class ServerSegmentsTable extends AbstractTable implements ScannableTable
{ {
private final TimelineServerView serverView; private final TimelineServerView serverView;
final AuthorizerMapper authorizerMapper; final AuthorizerMapper authorizerMapper;

View File

@ -34,24 +34,19 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table; import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@ -64,7 +59,6 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
@ -284,25 +278,6 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT DataSegment.PruneLoadSpecHolder.DEFAULT
); );
private final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
private final DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
"foo",
new NoopServiceEmitter()
);
private final DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
"foo2",
new NoopServiceEmitter()
);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer( private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L, 1L,
@ -314,7 +289,7 @@ public class SystemSchemaTest extends CalciteTestBase
new DruidServerMetadata("server2", "server2:1234", null, 5L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), new DruidServerMetadata("server2", "server2:1234", null, 5L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
1L, 1L,
null, null,
ImmutableMap.of("segment2", segment2, "segment4", segment4, "segment5", segment5) ImmutableMap.of("segment3", segment3, "segment4", segment4, "segment5", segment5)
); );
private final List<ImmutableDruidServer> immutableDruidServers = ImmutableList.of(druidServer1, druidServer2); private final List<ImmutableDruidServer> immutableDruidServers = ImmutableList.of(druidServer1, druidServer2);
@ -466,6 +441,7 @@ public class SystemSchemaTest extends CalciteTestBase
//segment 6 is published and unavailable, num_replicas is 0 //segment 6 is published and unavailable, num_replicas is 0
Assert.assertEquals(1L, row1[9]); Assert.assertEquals(1L, row1[9]);
Assert.assertEquals(0L, row1[7]); Assert.assertEquals(0L, row1[7]);
Assert.assertEquals(0L, row1[8]); //numRows = 0
Assert.assertEquals(true, enumerator.moveNext()); Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext()); Assert.assertEquals(true, enumerator.moveNext());
@ -476,6 +452,7 @@ public class SystemSchemaTest extends CalciteTestBase
//segment 2 is published and has 2 replicas //segment 2 is published and has 2 replicas
Assert.assertEquals(1L, row5[9]); Assert.assertEquals(1L, row5[9]);
Assert.assertEquals(2L, row5[7]); Assert.assertEquals(2L, row5[7]);
Assert.assertEquals(3L, row5[8]); //numRows = 3
Assert.assertEquals(true, enumerator.moveNext()); Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(false, enumerator.moveNext()); Assert.assertEquals(false, enumerator.moveNext());
@ -528,6 +505,84 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals("historical", row2[4].toString()); Assert.assertEquals("historical", row2[4].toString());
} }
@Test
public void testServerSegmentsTable()
{
SystemSchema.ServerSegmentsTable serverSegmentsTable = EasyMock.createMockBuilder(SystemSchema.ServerSegmentsTable.class)
.withConstructor(serverView, authMapper)
.createMock();
EasyMock.replay(serverSegmentsTable);
EasyMock.expect(serverView.getDruidServers())
.andReturn(immutableDruidServers)
.once();
EasyMock.replay(serverView);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
Enumerable<Object[]> rows = serverSegmentsTable.scan(dataContext);
Assert.assertEquals(5, rows.count());
//server_segments table is the join of servers and segments table
// it will have 5 rows as follows
// localhost:0000 | test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1(segment1)
// localhost:0000 | test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2(segment2)
// server2:1234 | test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3(segment3)
// server2:1234 | test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4)
// server2:1234 | test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5)
Enumerator<Object[]> enumerator = rows.enumerator();
Assert.assertEquals(true, enumerator.moveNext());
Object[] row1 = rows.first();
Assert.assertEquals("localhost:0000", row1[0]);
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row1[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row2 = enumerator.current();
Assert.assertEquals("localhost:0000", row2[0]);
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row2[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row3 = enumerator.current();
Assert.assertEquals("server2:1234", row3[0]);
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row3[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row4 = enumerator.current();
Assert.assertEquals("server2:1234", row4[0]);
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row4[1]);
Assert.assertEquals(true, enumerator.moveNext());
Object[] row5 = rows.last();
Assert.assertEquals("server2:1234", row5[0]);
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row5[1]);
Assert.assertEquals(false, enumerator.moveNext());
}
@Test @Test
public void testTasksTable() throws Exception public void testTasksTable() throws Exception
{ {