Fix num_replicas count in sys.segments table (#6804)

* Fix num_replicas count from sys.segments

* Adjust unit test for num_replica > 1

* Pass named arguments instead of passing boolean constants

* Address PR comments

* PR comments
This commit is contained in:
Surekha 2019-01-15 08:31:29 -08:00 committed by Fangjin Yang
parent 9a8bade2fb
commit f72f33f84a
7 changed files with 90 additions and 21 deletions

View File

@ -277,6 +277,8 @@ public class BrokerServerView implements TimelineServerView
server,
segmentId
);
} else {
runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
}
if (selector.isEmpty()) {

View File

@ -81,5 +81,16 @@ public interface TimelineServerView extends ServerView
* @return continue or unregister
*/
CallbackAction segmentRemoved(DataSegment segment);
/**
* Called when a segment is removed from a server. Note that the timeline can still have the segment, even though it's removed from given server.
* {@link #segmentRemoved(DataSegment)} is the authority on when segment is removed from the timeline.
*
* @param server The server that removed a segment
* @param segment The segment that was removed
*
* @return continue or unregister
*/
CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment);
}
}

View File

@ -20,8 +20,11 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@ -88,6 +91,8 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long IS_PUBLISHED = 0;
private static final long IS_AVAILABLE = 1;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
@ -168,6 +173,16 @@ public class DruidSchema extends AbstractSchema
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction serverSegmentRemoved(
final DruidServerMetadata server,
final DataSegment segment
)
{
removeServerSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
@ -338,12 +353,18 @@ public class DruidSchema extends AbstractSchema
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final Map<String, Set<String>> serverSegmentMap = ImmutableMap.of(
segment.getIdentifier(),
ImmutableSet.of(server.getName())
);
final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
segment.getIdentifier(),
0,
1,
IS_PUBLISHED,
IS_AVAILABLE,
isRealtime,
1
serverSegmentMap
).build();
// Unknown segment.
setSegmentSignature(segment, holder);
@ -357,13 +378,18 @@ public class DruidSchema extends AbstractSchema
} else {
if (knownSegments.containsKey(segment)) {
final SegmentMetadataHolder holder = knownSegments.get(segment);
final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
.addAll(segmentServerMap.get(segment.getIdentifier()))
.add(server.getName())
.build();
final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
holder.getSegmentId(),
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
holder.getNumReplicas()
).withNumReplicas(holder.getNumReplicas() + 1).build();
holder.getReplicas()
).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
knownSegments.put(segment, holderWithNumReplicas);
}
if (server.segmentReplicatable()) {
@ -403,6 +429,28 @@ public class DruidSchema extends AbstractSchema
}
}
private void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getIdentifier(), server.getName());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final SegmentMetadataHolder holder = knownSegments.get(segment);
final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getIdentifier()))
.filter(Predicates.not(Predicates.equalTo(server.getName())))
.toSet();
final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
holder.getSegmentId(),
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
holder.getReplicas()
).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
knownSegments.put(segment, holderWithNumReplicas);
lock.notifyAll();
}
}
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
@ -475,7 +523,7 @@ public class DruidSchema extends AbstractSchema
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
holder.getNumReplicas()
holder.getReplicas()
).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);

View File

@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
/**
* Immutable representation of RowSignature and other segment attributes needed by {@link SystemSchema.SegmentsTable}
@ -36,7 +38,8 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
private final String segmentId;
private final long numReplicas;
//segmentId -> set of servers that contain the segment
private final Map<String, Set<String>> segmentServerMap;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
@ -47,7 +50,7 @@ public class SegmentMetadataHolder
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
this.numReplicas = builder.numReplicas;
this.segmentServerMap = builder.segmentServerMap;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
@ -72,9 +75,14 @@ public class SegmentMetadataHolder
return segmentId;
}
public long getNumReplicas()
public Map<String, Set<String>> getReplicas()
{
return numReplicas;
return segmentServerMap;
}
public long getNumReplicas(String segmentId)
{
return segmentServerMap.get(segmentId).size();
}
public long getNumRows()
@ -95,7 +103,7 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
private long numReplicas;
private Map<String, Set<String>> segmentServerMap;
@Nullable
private RowSignature rowSignature;
private long numRows;
@ -105,14 +113,14 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
long numReplicas
Map<String, Set<String>> segmentServerMap
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
this.numReplicas = numReplicas;
this.segmentServerMap = segmentServerMap;
}
public Builder withRowSignature(RowSignature rowSignature)
@ -127,9 +135,9 @@ public class SegmentMetadataHolder
return this;
}
public Builder withNumReplicas(long numReplicas)
public Builder withReplicas(Map<String, Set<String>> segmentServerMap)
{
this.numReplicas = numReplicas;
this.segmentServerMap = segmentServerMap;
return this;
}

View File

@ -229,7 +229,7 @@ public class SystemSchema extends AbstractSchema
final Map<String, PartialSegmentData> partialSegmentDataMap = availableSegmentMetadata.values().stream().collect(
Collectors.toMap(
SegmentMetadataHolder::getSegmentId,
h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows())
h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows())
));
//get published segments from coordinator

View File

@ -199,7 +199,6 @@ public class SystemSchemaTest extends CalciteTestBase
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
.add(segment1, index1)
.add(segment2, index2)
.add(segment2, index2)
.add(segment3, index2);
druidSchema = new DruidSchema(
@ -282,7 +281,7 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT
);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment2, segment4, segment5);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
@ -478,6 +477,7 @@ public class SystemSchemaTest extends CalciteTestBase
// segments test1, test2 are published and available
// segment test3 is served by historical but unpublished or unused
// segments test4, test5 are not published but available (realtime segments)
// segment test2 is both published and served by a realtime server.
Assert.assertEquals(8, rows.size());
@ -556,7 +556,7 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals(100L, row4[4]);
Assert.assertEquals("version2", row4[5]);
Assert.assertEquals(0L, row4[6]); //partition_num
Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
Assert.assertEquals(2L, row4[7]); //segment test2 is served by historical and realtime servers
Assert.assertEquals(3L, row4[8]); //numRows = 3
Assert.assertEquals(1L, row4[9]); //is_published
Assert.assertEquals(1L, row4[10]); //is_available

View File

@ -51,8 +51,8 @@ public class TestServerInventoryView implements TimelineServerView
0
);
private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
"dummy",
"dummy",
"dummy2",
"dummy2",
null,
0,
ServerType.REALTIME,