Rename SegmentMetadataHolder to AvailableSegmentMetadata (#7372)

This commit is contained in:
Surekha 2019-04-14 10:19:48 -07:00 committed by Gian Merlino
parent 9b5c216684
commit 3e5dae9b96
4 changed files with 89 additions and 98 deletions

View File

@ -27,28 +27,25 @@ import java.util.Set;
/**
* Immutable representation of RowSignature and other segment attributes needed by {@link SystemSchema.SegmentsTable}
* This class contains the metadata of segments announced by historicals or ingestion tasks.
*/
public class SegmentMetadataHolder
public class AvailableSegmentMetadata
{
public static Builder builder(
SegmentId segmentId,
long isPublished,
long isAvailable,
long isRealtime,
Set<String> segmentServers,
RowSignature rowSignature,
long numRows
)
{
return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows);
return new Builder(segmentId, isRealtime, segmentServers, rowSignature, numRows);
}
public static Builder from(SegmentMetadataHolder h)
public static Builder from(AvailableSegmentMetadata h)
{
return new Builder(
h.getSegmentId(),
h.isPublished(),
h.isAvailable(),
h.isRealtime(),
h.getReplicas(),
h.getRowSignature(),
@ -58,10 +55,7 @@ public class SegmentMetadataHolder
private final SegmentId segmentId;
// Booleans represented as long type, where 1 = true and 0 = false
// to make it easy to count number of segments which are
// published, available or realtime etc.
private final long isPublished;
private final long isAvailable;
// to make it easy to count number of segments which are realtime
private final long isRealtime;
// set of servers that contain the segment
private final Set<String> segmentServers;
@ -69,27 +63,15 @@ public class SegmentMetadataHolder
@Nullable
private final RowSignature rowSignature;
private SegmentMetadataHolder(Builder builder)
private AvailableSegmentMetadata(Builder builder)
{
this.rowSignature = builder.rowSignature;
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
this.segmentServers = builder.segmentServers;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
public long isPublished()
{
return isPublished;
}
public long isAvailable()
{
return isAvailable;
}
public long isRealtime()
{
return isRealtime;
@ -124,8 +106,6 @@ public class SegmentMetadataHolder
public static class Builder
{
private final SegmentId segmentId;
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
private Set<String> segmentServers;
@ -135,8 +115,6 @@ public class SegmentMetadataHolder
private Builder(
SegmentId segmentId,
long isPublished,
long isAvailable,
long isRealtime,
Set<String> servers,
RowSignature rowSignature,
@ -144,8 +122,6 @@ public class SegmentMetadataHolder
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
this.segmentServers = servers;
this.rowSignature = rowSignature;
@ -170,9 +146,9 @@ public class SegmentMetadataHolder
return this;
}
public SegmentMetadataHolder build()
public AvailableSegmentMetadata build()
{
return new SegmentMetadataHolder(this);
return new AvailableSegmentMetadata(this);
}
}

View File

@ -95,8 +95,6 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long DEFAULT_IS_PUBLISHED = 0;
private static final long DEFAULT_IS_AVAILABLE = 1;
private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
@ -111,10 +109,10 @@ public class DruidSchema extends AbstractSchema
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata
private final Object lock = new Object();
// DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment.
// DataSource -> Segment -> AvailableSegmentMetadata(contains RowSignature) for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
@GuardedBy("lock")
private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new HashMap<>();
private final Map<String, TreeMap<DataSegment, AvailableSegmentMetadata>> segmentMetadataInfo = new HashMap<>();
private int totalSegments = 0;
// All mutable segments.
@ -356,24 +354,22 @@ public class DruidSchema extends AbstractSchema
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
SegmentMetadataHolder holder = knownSegments != null ? knownSegments.get(segment) : null;
if (holder == null) {
final Map<DataSegment, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment) : null;
if (segmentMetadata == null) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final Set<String> servers = ImmutableSet.of(server.getName());
holder = SegmentMetadataHolder.builder(
segmentMetadata = AvailableSegmentMetadata.builder(
segment.getId(),
DEFAULT_IS_PUBLISHED,
DEFAULT_IS_AVAILABLE,
isRealtime,
servers,
null,
DEFAULT_NUM_ROWS
).build();
// Unknown segment.
setSegmentMetadataHolder(segment, holder);
setAvailableSegmentMetadata(segment, segmentMetadata);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getId());
@ -382,16 +378,16 @@ public class DruidSchema extends AbstractSchema
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
final Set<String> segmentServers = holder.getReplicas();
final Set<String> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
.addAll(segmentServers)
.add(server.getName())
.build();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
knownSegments.put(segment, metadataWithNumReplicas);
if (server.segmentReplicatable()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
@ -417,7 +413,7 @@ public class DruidSchema extends AbstractSchema
segmentsNeedingRefresh.remove(segment);
mutableSegments.remove(segment);
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments =
final Map<DataSegment, AvailableSegmentMetadata> dataSourceSegments =
segmentMetadataInfo.get(segment.getDataSource());
if (dataSourceSegments.remove(segment) != null) {
totalSegments--;
@ -437,17 +433,17 @@ public class DruidSchema extends AbstractSchema
{
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final SegmentMetadataHolder holder = knownSegments.get(segment);
final Set<String> segmentServers = holder.getReplicas();
final Map<DataSegment, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment);
final Set<String> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server.getName())))
.toSet();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
knownSegments.put(segment, metadataWithNumReplicas);
lock.notifyAll();
}
}
@ -511,25 +507,25 @@ public class DruidSchema extends AbstractSchema
synchronized (lock) {
final RowSignature rowSignature = analysisToRowSignature(analysis);
log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature);
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments =
final Map<DataSegment, AvailableSegmentMetadata> dataSourceSegments =
segmentMetadataInfo.get(segment.getDataSource());
if (dataSourceSegments == null) {
log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource());
} else {
SegmentMetadataHolder holder = dataSourceSegments.get(segment);
if (holder == null) {
final AvailableSegmentMetadata segmentMetadata = dataSourceSegments.get(segment);
if (segmentMetadata == null) {
log.warn(
"No segment[%s] found, skipping refresh",
segment.getId()
);
} else {
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder
.from(holder)
final AvailableSegmentMetadata updatedSegmentMetadata = AvailableSegmentMetadata
.from(segmentMetadata)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentMetadataHolder(segment, updatedHolder);
dataSourceSegments.put(segment, updatedSegmentMetadata);
setAvailableSegmentMetadata(segment, updatedSegmentMetadata);
retVal.add(segment);
}
}
@ -555,14 +551,14 @@ public class DruidSchema extends AbstractSchema
}
@VisibleForTesting
void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
void setAvailableSegmentMetadata(final DataSegment segment, final AvailableSegmentMetadata availableSegmentMetadata)
{
synchronized (lock) {
TreeMap<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(
TreeMap<DataSegment, AvailableSegmentMetadata> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(
segment.getDataSource(),
x -> new TreeMap<>(SEGMENT_ORDER)
);
if (dataSourceSegments.put(segment, segmentMetadataHolder) == null) {
if (dataSourceSegments.put(segment, availableSegmentMetadata) == null) {
totalSegments++;
}
}
@ -571,12 +567,12 @@ public class DruidSchema extends AbstractSchema
private DruidTable buildDruidTable(final String dataSource)
{
synchronized (lock) {
final Map<DataSegment, SegmentMetadataHolder> segmentMap = segmentMetadataInfo.get(dataSource);
final Map<DataSegment, AvailableSegmentMetadata> segmentMap = segmentMetadataInfo.get(dataSource);
final Map<String, ValueType> columnTypes = new TreeMap<>();
if (segmentMap != null) {
for (SegmentMetadataHolder segmentMetadataHolder : segmentMap.values()) {
final RowSignature rowSignature = segmentMetadataHolder.getRowSignature();
for (AvailableSegmentMetadata availableSegmentMetadata : segmentMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
for (String column : rowSignature.getRowOrder()) {
// Newer column types should override older ones.
@ -647,11 +643,11 @@ public class DruidSchema extends AbstractSchema
return rowSignatureBuilder.build();
}
Map<DataSegment, SegmentMetadataHolder> getSegmentMetadata()
Map<DataSegment, AvailableSegmentMetadata> getSegmentMetadata()
{
final Map<DataSegment, SegmentMetadataHolder> segmentMetadata = new HashMap<>();
final Map<DataSegment, AvailableSegmentMetadata> segmentMetadata = new HashMap<>();
synchronized (lock) {
for (TreeMap<DataSegment, SegmentMetadataHolder> val : segmentMetadataInfo.values()) {
for (TreeMap<DataSegment, AvailableSegmentMetadata> val : segmentMetadataInfo.values()) {
segmentMetadata.putAll(val);
}
}

View File

@ -179,6 +179,9 @@ public class SystemSchema extends AbstractSchema
return tableMap;
}
/**
* This table contains row per segment from metadata store as well as served segments.
*/
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
@ -186,6 +189,14 @@ public class SystemSchema extends AbstractSchema
private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView;
/**
* Booleans constants used for available segments represented as long type,
* where 1 = true and 0 = false to make it easy to count number of segments
* which are published, available
*/
private static final long DEFAULT_IS_PUBLISHED = 0;
private static final long DEFAULT_IS_AVAILABLE = 1;
public SegmentsTable(
DruidSchema druidSchemna,
MetadataSegmentView metadataView,
@ -215,27 +226,27 @@ public class SystemSchema extends AbstractSchema
public Enumerable<Object[]> scan(DataContext root)
{
//get available segments from druidSchema
final Map<DataSegment, SegmentMetadataHolder> availableSegmentMetadata = druidSchema.getSegmentMetadata();
final Iterator<Entry<DataSegment, SegmentMetadataHolder>> availableSegmentEntries =
final Map<DataSegment, AvailableSegmentMetadata> availableSegmentMetadata = druidSchema.getSegmentMetadata();
final Iterator<Entry<DataSegment, AvailableSegmentMetadata>> availableSegmentEntries =
availableSegmentMetadata.entrySet().iterator();
// in memory map to store segment data from available segments
final Map<SegmentId, PartialSegmentData> partialSegmentDataMap =
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
for (SegmentMetadataHolder h : availableSegmentMetadata.values()) {
for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData =
new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows());
new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}
//get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator
final Iterator<DataSegment> metadataSegments = metadataView.getPublishedSegments();
final Iterator<DataSegment> metadataStoreSegments = metadataView.getPublishedSegments();
final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();
final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> getAuthorizedPublishedSegments(
metadataSegments,
metadataStoreSegments,
root
))
.transform(val -> {
@ -292,8 +303,8 @@ public class SystemSchema extends AbstractSchema
Long.valueOf(val.getKey().getShardSpec().getPartitionNum()),
numReplicas,
val.getValue().getNumRows(),
val.getValue().isPublished(),
val.getValue().isAvailable(),
DEFAULT_IS_PUBLISHED,
DEFAULT_IS_AVAILABLE,
val.getValue().isRealtime(),
jsonMapper.writeValueAsString(val.getKey())
};
@ -331,18 +342,18 @@ public class SystemSchema extends AbstractSchema
return authorizedSegments.iterator();
}
private Iterator<Entry<DataSegment, SegmentMetadataHolder>> getAuthorizedAvailableSegments(
Iterator<Entry<DataSegment, SegmentMetadataHolder>> availableSegmentEntries,
private Iterator<Entry<DataSegment, AvailableSegmentMetadata>> getAuthorizedAvailableSegments(
Iterator<Entry<DataSegment, AvailableSegmentMetadata>> availableSegmentEntries,
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
Function<Entry<DataSegment, SegmentMetadataHolder>, Iterable<ResourceAction>> raGenerator = segment -> Collections
Function<Entry<DataSegment, AvailableSegmentMetadata>, Iterable<ResourceAction>> raGenerator = segment -> Collections
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource()));
final Iterable<Entry<DataSegment, SegmentMetadataHolder>> authorizedSegments =
final Iterable<Entry<DataSegment, AvailableSegmentMetadata>> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
() -> availableSegmentEntries,
@ -396,6 +407,10 @@ public class SystemSchema extends AbstractSchema
}
}
/**
* This table contains row per server. At this time it only contains the
* data servers (i.e. historicals and peons)
*/
static class ServersTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
@ -449,6 +464,9 @@ public class SystemSchema extends AbstractSchema
}
}
/**
* This table contains row per segment per server.
*/
static class ServerSegmentsTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
@ -490,6 +508,9 @@ public class SystemSchema extends AbstractSchema
}
}
/**
* This table contains row per task.
*/
static class TasksTable extends AbstractTable implements ScannableTable
{
private final DruidLeaderClient druidLeaderClient;

View File

@ -247,14 +247,14 @@ public class DruidSchemaTest extends CalciteTestBase
}
/**
* This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case
* This tests that {@link AvailableSegmentMetadata#getNumRows()} is correct in case
* of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)}
* is called more than once for same segment
*/
@Test
public void testSegmentMetadataHolderNumRows()
{
Map<DataSegment, SegmentMetadataHolder> segmentsMetadata = schema.getSegmentMetadata();
Map<DataSegment, AvailableSegmentMetadata> segmentsMetadata = schema.getSegmentMetadata();
final Set<DataSegment> segments = segmentsMetadata.keySet();
Assert.assertEquals(3, segments.size());
// find the only segment with datasource "foo2"
@ -263,10 +263,10 @@ public class DruidSchemaTest extends CalciteTestBase
.findFirst()
.orElse(null);
Assert.assertNotNull(existingSegment);
final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment);
// update SegmentMetadataHolder of existingSegment with numRows=5
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
final AvailableSegmentMetadata existingMetadata = segmentsMetadata.get(existingSegment);
// update AvailableSegmentMetadata of existingSegment with numRows=5
AvailableSegmentMetadata updatedMetadata = AvailableSegmentMetadata.from(existingMetadata).withNumRows(5).build();
schema.setAvailableSegmentMetadata(existingSegment, updatedMetadata);
// find a druidServer holding existingSegment
final Pair<ImmutableDruidServer, DataSegment> pair = druidServers
.stream()
@ -289,19 +289,17 @@ public class DruidSchemaTest extends CalciteTestBase
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment);
Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId());
Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows());
final AvailableSegmentMetadata currentMetadata = segmentsMetadata.get(currentSegment);
Assert.assertEquals(updatedMetadata.getSegmentId(), currentMetadata.getSegmentId());
Assert.assertEquals(updatedMetadata.getNumRows(), currentMetadata.getNumRows());
// numreplicas do not change here since we addSegment with the same server which was serving existingSegment before
Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas());
Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable());
Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished());
Assert.assertEquals(updatedMetadata.getNumReplicas(), currentMetadata.getNumReplicas());
}
@Test
public void testNullDatasource() throws IOException
{
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Map<DataSegment, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
// segments contains two segments with datasource "foo" and one with datasource "foo2"
@ -319,9 +317,9 @@ public class DruidSchemaTest extends CalciteTestBase
}
@Test
public void testNullSegmentMetadataHolder() throws IOException
public void testNullAvailableSegmentMetadata() throws IOException
{
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Map<DataSegment, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
// remove one of the segments with datasource "foo"
@ -331,7 +329,7 @@ public class DruidSchemaTest extends CalciteTestBase
.orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from
schema.refreshSegments(segments); // can cause NPE without segmentMetadata null check in DruidSchema#refreshSegmentsForDataSource
segmentMetadatas = schema.getSegmentMetadata();
segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 2);