mirror of https://github.com/apache/druid.git
Remove payload field from table sys.segment (#9883)
* remove payload field from table sys.segments * update doc * fix test * fix CI failure * add necessary fields * fix doc * fix comment
This commit is contained in:
parent
4a625751e8
commit
fc555980e8
|
@ -994,7 +994,9 @@ Segments table provides details on all Druid segments, whether they are publishe
|
|||
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime). See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|
||||
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment.|
|
||||
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|
||||
|payload|STRING|JSON-serialized data segment payload|
|
||||
|shardSpec|STRING|The toString of specific `ShardSpec`|
|
||||
|dimensions|STRING|The dimensions of the segment|
|
||||
|metrics|STRING|The metrics of the segment|
|
||||
|
||||
For example to retrieve all segments for datasource "wikipedia", use the query:
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
"is_available": 1,
|
||||
"is_realtime": 0,
|
||||
"is_overshadowed": 0,
|
||||
"payload": "{\"overshadowed\":false,\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}"
|
||||
"shardSpec": "NoneShardSpec",
|
||||
"dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]",
|
||||
"metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]"
|
||||
}
|
||||
]
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.apache.druid.sql.calcite.schema;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
@ -56,7 +55,6 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
|||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.indexer.TaskStatusPlus;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
|
@ -143,7 +141,9 @@ public class SystemSchema extends AbstractSchema
|
|||
.add("is_available", ValueType.LONG)
|
||||
.add("is_realtime", ValueType.LONG)
|
||||
.add("is_overshadowed", ValueType.LONG)
|
||||
.add("payload", ValueType.STRING)
|
||||
.add("shardSpec", ValueType.STRING)
|
||||
.add("dimensions", ValueType.STRING)
|
||||
.add("metrics", ValueType.STRING)
|
||||
.build();
|
||||
|
||||
static final RowSignature SERVERS_SIGNATURE = RowSignature
|
||||
|
@ -294,37 +294,34 @@ public class SystemSchema extends AbstractSchema
|
|||
final FluentIterable<Object[]> publishedSegments = FluentIterable
|
||||
.from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root))
|
||||
.transform(val -> {
|
||||
try {
|
||||
final DataSegment segment = val.getDataSegment();
|
||||
segmentsAlreadySeen.add(segment.getId());
|
||||
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
|
||||
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
|
||||
if (partialSegmentData != null) {
|
||||
numReplicas = partialSegmentData.getNumReplicas();
|
||||
numRows = partialSegmentData.getNumRows();
|
||||
isAvailable = partialSegmentData.isAvailable();
|
||||
isRealtime = partialSegmentData.isRealtime();
|
||||
}
|
||||
return new Object[]{
|
||||
segment.getId(),
|
||||
segment.getDataSource(),
|
||||
segment.getInterval().getStart().toString(),
|
||||
segment.getInterval().getEnd().toString(),
|
||||
segment.getSize(),
|
||||
segment.getVersion(),
|
||||
Long.valueOf(segment.getShardSpec().getPartitionNum()),
|
||||
numReplicas,
|
||||
numRows,
|
||||
IS_PUBLISHED_TRUE, //is_published is true for published segments
|
||||
isAvailable,
|
||||
isRealtime,
|
||||
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
|
||||
jsonMapper.writeValueAsString(val)
|
||||
};
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId());
|
||||
final DataSegment segment = val.getDataSegment();
|
||||
segmentsAlreadySeen.add(segment.getId());
|
||||
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
|
||||
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
|
||||
if (partialSegmentData != null) {
|
||||
numReplicas = partialSegmentData.getNumReplicas();
|
||||
numRows = partialSegmentData.getNumRows();
|
||||
isAvailable = partialSegmentData.isAvailable();
|
||||
isRealtime = partialSegmentData.isRealtime();
|
||||
}
|
||||
return new Object[]{
|
||||
segment.getId(),
|
||||
segment.getDataSource(),
|
||||
segment.getInterval().getStart().toString(),
|
||||
segment.getInterval().getEnd().toString(),
|
||||
segment.getSize(),
|
||||
segment.getVersion(),
|
||||
Long.valueOf(segment.getShardSpec().getPartitionNum()),
|
||||
numReplicas,
|
||||
numRows,
|
||||
IS_PUBLISHED_TRUE, //is_published is true for published segments
|
||||
isAvailable,
|
||||
isRealtime,
|
||||
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
|
||||
segment.getShardSpec(),
|
||||
segment.getDimensions(),
|
||||
segment.getMetrics()
|
||||
};
|
||||
});
|
||||
|
||||
final FluentIterable<Object[]> availableSegments = FluentIterable
|
||||
|
@ -333,33 +330,30 @@ public class SystemSchema extends AbstractSchema
|
|||
root
|
||||
))
|
||||
.transform(val -> {
|
||||
try {
|
||||
if (segmentsAlreadySeen.contains(val.getKey())) {
|
||||
return null;
|
||||
}
|
||||
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
|
||||
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
|
||||
return new Object[]{
|
||||
val.getKey(),
|
||||
val.getKey().getDataSource(),
|
||||
val.getKey().getInterval().getStart().toString(),
|
||||
val.getKey().getInterval().getEnd().toString(),
|
||||
val.getValue().getSegment().getSize(),
|
||||
val.getKey().getVersion(),
|
||||
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
|
||||
numReplicas,
|
||||
val.getValue().getNumRows(),
|
||||
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
|
||||
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
|
||||
IS_AVAILABLE_TRUE,
|
||||
val.getValue().isRealtime(),
|
||||
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
|
||||
jsonMapper.writeValueAsString(val.getKey())
|
||||
};
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RE(e, "Error getting segment payload for segment %s", val.getKey());
|
||||
if (segmentsAlreadySeen.contains(val.getKey())) {
|
||||
return null;
|
||||
}
|
||||
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
|
||||
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
|
||||
return new Object[]{
|
||||
val.getKey(),
|
||||
val.getKey().getDataSource(),
|
||||
val.getKey().getInterval().getStart().toString(),
|
||||
val.getKey().getInterval().getEnd().toString(),
|
||||
val.getValue().getSegment().getSize(),
|
||||
val.getKey().getVersion(),
|
||||
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
|
||||
numReplicas,
|
||||
val.getValue().getNumRows(),
|
||||
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
|
||||
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
|
||||
IS_AVAILABLE_TRUE,
|
||||
val.getValue().isRealtime(),
|
||||
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
|
||||
val.getValue().getSegment().getShardSpec(),
|
||||
val.getValue().getSegment().getDimensions(),
|
||||
val.getValue().getSegment().getMetrics()
|
||||
};
|
||||
});
|
||||
|
||||
final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
|
||||
|
|
|
@ -91,6 +91,7 @@ import org.apache.druid.timeline.DataSegment;
|
|||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
|
||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.apache.druid.timeline.partition.ShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
@ -472,7 +473,7 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
|
||||
final List<RelDataTypeField> fields = rowType.getFieldList();
|
||||
|
||||
Assert.assertEquals(14, fields.size());
|
||||
Assert.assertEquals(16, fields.size());
|
||||
|
||||
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
|
||||
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
|
||||
|
@ -1248,6 +1249,10 @@ public class SystemSchemaTest extends CalciteTestBase
|
|||
case STRING:
|
||||
if (signature.getColumnName(i).equals("segment_id")) {
|
||||
expectedClass = SegmentId.class;
|
||||
} else if (signature.getColumnName(i).equals("shardSpec")) {
|
||||
expectedClass = ShardSpec.class;
|
||||
} else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) {
|
||||
expectedClass = List.class;
|
||||
} else {
|
||||
expectedClass = String.class;
|
||||
}
|
||||
|
|
|
@ -146,7 +146,6 @@ interface SegmentQueryResultRow {
|
|||
version: string;
|
||||
size: 0;
|
||||
partition_num: number;
|
||||
payload: any;
|
||||
num_rows: number;
|
||||
num_replicas: number;
|
||||
is_available: number;
|
||||
|
@ -211,6 +210,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
|
|||
`FROM sys.segments`,
|
||||
whereClause ? `WHERE ${whereClause}` : '',
|
||||
`GROUP BY 1`,
|
||||
`ORDER BY 1 DESC`,
|
||||
`LIMIT ${totalQuerySize}`,
|
||||
]).join('\n');
|
||||
|
||||
|
@ -221,7 +221,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
|
|||
queryParts = compact([
|
||||
`SELECT`,
|
||||
` ("start" || '/' || "end") AS "interval",`,
|
||||
` "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed", "payload"`,
|
||||
` "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed"`,
|
||||
`FROM sys.segments`,
|
||||
`WHERE`,
|
||||
intervals ? ` ("start" || '/' || "end") IN (${intervals})` : 'FALSE',
|
||||
|
@ -240,7 +240,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
|
|||
queryParts.push(`LIMIT ${totalQuerySize * 1000}`);
|
||||
} else {
|
||||
queryParts = [
|
||||
`SELECT "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed", "payload"`,
|
||||
`SELECT "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed"`,
|
||||
`FROM sys.segments`,
|
||||
];
|
||||
|
||||
|
@ -264,13 +264,6 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
|
|||
const results: any[] = (await queryDruidSql({ query: sqlQuery })).slice(
|
||||
query.page * query.pageSize,
|
||||
);
|
||||
results.forEach(result => {
|
||||
try {
|
||||
result.payload = JSON.parse(result.payload);
|
||||
} catch {
|
||||
result.payload = {};
|
||||
}
|
||||
});
|
||||
return results;
|
||||
},
|
||||
onStateChange: ({ result, loading, error }) => {
|
||||
|
@ -299,7 +292,6 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
|
|||
version: segment.version,
|
||||
partition_num: segment.shardSpec.partitionNum ? 0 : segment.shardSpec.partitionNum,
|
||||
size: segment.size,
|
||||
payload: segment,
|
||||
num_rows: -1,
|
||||
num_replicas: -1,
|
||||
is_available: -1,
|
||||
|
|
Loading…
Reference in New Issue