Fix num_rows in sys.segments (#6888)

* Fix the bug with num_rows in sys.segments

* Fix segmentMetadataInfo update in DruidSchema
* Add numRows to SegmentMetadataHolder builder's constructor, so it's not overwritten
* Rename SegSegmentSignature to setSegmentMetadataHolder and fix it so nested map is appended instead of recreated
* Replace Map<String, Set<String>> segmentServerMap with Set<String> for num_replica

* Remove unnecessary code and update test

* Add unit test for num_rows

* PR comments

* change access modifier to default package level

* minor changes to comments

* PR comments
This commit is contained in:
Surekha 2019-02-11 16:21:19 -08:00 committed by Jihoon Son
parent 16a4a50e91
commit 02ef14f262
6 changed files with 162 additions and 49 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
import com.amazonaws.annotation.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@ -63,7 +64,6 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Comparator;
@ -95,8 +95,9 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long IS_PUBLISHED = 0;
private static final long IS_AVAILABLE = 1;
private static final long DEFAULT_IS_PUBLISHED = 0;
private static final long DEFAULT_IS_AVAILABLE = 1;
private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
@ -107,12 +108,12 @@ public class DruidSchema extends AbstractSchema
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata
private final Object lock = new Object();
// DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
// This data structure need to be accessed in a thread-safe way since SystemSchema accesses it
@GuardedBy("lock")
private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new HashMap<>();
private int totalSegments = 0;
@ -351,7 +352,8 @@ public class DruidSchema extends AbstractSchema
return builder.build();
}
private void addSegment(final DruidServerMetadata server, final DataSegment segment)
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
@ -360,16 +362,18 @@ public class DruidSchema extends AbstractSchema
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final Map<SegmentId, Set<String>> serverSegmentMap = ImmutableMap.of(
final Set<String> servers = ImmutableSet.of(server.getName());
holder = SegmentMetadataHolder.builder(
segment.getId(),
ImmutableSet.of(server.getName())
);
holder = SegmentMetadataHolder
.builder(segment.getId(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, serverSegmentMap)
.build();
DEFAULT_IS_PUBLISHED,
DEFAULT_IS_AVAILABLE,
isRealtime,
servers,
null,
DEFAULT_NUM_ROWS
).build();
// Unknown segment.
setSegmentSignature(segment, holder);
setSegmentMetadataHolder(segment, holder);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getId());
@ -378,14 +382,14 @@ public class DruidSchema extends AbstractSchema
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
final Set<String> segmentServers = holder.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
.addAll(segmentServerMap.get(segment.getId()))
.addAll(segmentServers)
.add(server.getName())
.build();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
.withReplicas(ImmutableMap.of(segment.getId(), servers))
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
if (server.segmentReplicatable()) {
@ -404,7 +408,7 @@ public class DruidSchema extends AbstractSchema
}
@VisibleForTesting
protected void removeSegment(final DataSegment segment)
void removeSegment(final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
@ -435,13 +439,13 @@ public class DruidSchema extends AbstractSchema
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final SegmentMetadataHolder holder = knownSegments.get(segment);
final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getId()))
final Set<String> segmentServers = holder.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server.getName())))
.toSet();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
.withReplicas(ImmutableMap.of(segment.getId(), servers))
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
lock.notifyAll();
@ -453,7 +457,7 @@ public class DruidSchema extends AbstractSchema
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
protected Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> retVal = new HashSet<>();
@ -525,7 +529,7 @@ public class DruidSchema extends AbstractSchema
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);
setSegmentMetadataHolder(segment, updatedHolder);
retVal.add(segment);
}
}
@ -550,7 +554,8 @@ public class DruidSchema extends AbstractSchema
return retVal;
}
private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
@VisibleForTesting
void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
{
synchronized (lock) {
TreeMap<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(

View File

@ -23,7 +23,6 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
/**
@ -36,15 +35,25 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
Map<SegmentId, Set<String>> segmentServerMap
Set<String> segmentServers,
RowSignature rowSignature,
long numRows
)
{
return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServerMap);
return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows);
}
public static Builder from(SegmentMetadataHolder h)
{
return new Builder(h.getSegmentId(), h.isPublished(), h.isAvailable(), h.isRealtime(), h.getReplicas());
return new Builder(
h.getSegmentId(),
h.isPublished(),
h.isAvailable(),
h.isRealtime(),
h.getReplicas(),
h.getRowSignature(),
h.getNumRows()
);
}
private final SegmentId segmentId;
@ -54,8 +63,8 @@ public class SegmentMetadataHolder
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
//segmentId -> set of servers that contain the segment
private final Map<SegmentId, Set<String>> segmentServerMap;
// set of servers that contain the segment
private final Set<String> segmentServers;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
@ -66,7 +75,7 @@ public class SegmentMetadataHolder
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
this.segmentServerMap = builder.segmentServerMap;
this.segmentServers = builder.segmentServers;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
@ -91,14 +100,14 @@ public class SegmentMetadataHolder
return segmentId;
}
public Map<SegmentId, Set<String>> getReplicas()
public Set<String> getReplicas()
{
return segmentServerMap;
return segmentServers;
}
public long getNumReplicas(SegmentId segmentId)
public long getNumReplicas()
{
return segmentServerMap.get(segmentId).size();
return segmentServers.size();
}
public long getNumRows()
@ -119,7 +128,7 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
private Map<SegmentId, Set<String>> segmentServerMap;
private Set<String> segmentServers;
@Nullable
private RowSignature rowSignature;
private long numRows;
@ -129,14 +138,18 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
Map<SegmentId, Set<String>> segmentServerMap
Set<String> servers,
RowSignature rowSignature,
long numRows
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
this.segmentServerMap = segmentServerMap;
this.segmentServers = servers;
this.rowSignature = rowSignature;
this.numRows = numRows;
}
public Builder withRowSignature(RowSignature rowSignature)
@ -151,9 +164,9 @@ public class SegmentMetadataHolder
return this;
}
public Builder withReplicas(Map<SegmentId, Set<String>> segmentServerMap)
public Builder withReplicas(Set<String> servers)
{
this.segmentServerMap = segmentServerMap;
this.segmentServers = servers;
return this;
}

View File

@ -224,7 +224,7 @@ public class SystemSchema extends AbstractSchema
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
for (SegmentMetadataHolder h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData =
new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows());
new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}

View File

@ -27,6 +27,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@ -40,6 +42,7 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
@ -84,6 +87,8 @@ public class DruidSchemaTest extends CalciteTestBase
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private List<ImmutableDruidServer> druidServers;
@BeforeClass
public static void setUpClass()
{
@ -163,10 +168,12 @@ public class DruidSchemaTest extends CalciteTestBase
index2
);
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments());
druidServers = serverView.getDruidServers();
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
serverView,
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@ -239,6 +246,62 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName());
}
/**
* This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case
* of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)}
* is called more than once for same segment
*/
@Test
public void testSegmentMetadataHolderNumRows()
{
Map<DataSegment, SegmentMetadataHolder> segmentsMetadata = schema.getSegmentMetadata();
final Set<DataSegment> segments = segmentsMetadata.keySet();
Assert.assertEquals(3, segments.size());
// find the only segment with datasource "foo2"
final DataSegment existingSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
Assert.assertNotNull(existingSegment);
final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment);
// update SegmentMetadataHolder of existingSegment with numRows=5
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
// find a druidServer holding existingSegment
final Pair<ImmutableDruidServer, DataSegment> pair = druidServers.stream()
.flatMap(druidServer -> druidServer.getSegments()
.stream()
.filter(segment -> segment
.equals(
existingSegment))
.map(segment -> Pair
.of(
druidServer,
segment
)))
.findAny()
.orElse(null);
Assert.assertNotNull(pair);
final ImmutableDruidServer server = pair.lhs;
Assert.assertNotNull(server);
final DruidServerMetadata druidServerMetadata = server.getMetadata();
// invoke DruidSchema#addSegment on existingSegment
schema.addSegment(druidServerMetadata, existingSegment);
segmentsMetadata = schema.getSegmentMetadata();
// get the only segment with datasource "foo2"
final DataSegment currentSegment = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment);
Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId());
Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows());
// numreplicas do not change here since we addSegment with the same server which was serving existingSegment before
Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas());
Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable());
Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished());
}
@Test
public void testNullDatasource() throws IOException
{
@ -247,7 +310,10 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(segments.size(), 3);
// segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null);
final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource
@ -262,8 +328,11 @@ public class DruidSchemaTest extends CalciteTestBase
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
//remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null);
// remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo"))
.findFirst()
.orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from

View File

@ -118,6 +118,11 @@ public class SystemSchemaTest extends CalciteTestBase
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
);
private static final List<InputRow> ROWS3 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "7.0", "dim3", ImmutableList.of("x"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "8.0", "dim3", ImmutableList.of("xyz")))
);
private SystemSchema schema;
private SpecificSegmentsQuerySegmentWalker walker;
private DruidLeaderClient client;
@ -204,11 +209,22 @@ public class SystemSchemaTest extends CalciteTestBase
)
.rows(ROWS2)
.buildMMappedIndex();
final QueryableIndex index3 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "3"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(new LongSumAggregatorFactory("m1", "m1"))
.withRollup(false)
.build()
)
.rows(ROWS3)
.buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
.add(segment1, index1)
.add(segment2, index2)
.add(segment3, index2);
.add(segment3, index3);
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
@ -469,7 +485,7 @@ public class SystemSchemaTest extends CalciteTestBase
100L,
2L, //partition_num
1L, //num_replicas
3L, //numRows
2L, //numRows
0L, //is_published
1L, //is_available
0L //is_realtime
@ -481,7 +497,7 @@ public class SystemSchemaTest extends CalciteTestBase
100L,
0L, //partition_num
1L, //num_replicas
0L, //numRows = 3
0L, //numRows
0L, //is_published
1L, //is_available
1L //is_realtime

View File

@ -20,7 +20,9 @@
package org.apache.druid.sql.calcite.util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.ServerSelector;
@ -33,6 +35,7 @@ import org.apache.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
@ -83,7 +86,14 @@ public class TestServerInventoryView implements TimelineServerView
@Override
public List<ImmutableDruidServer> getDruidServers()
{
throw new UnsupportedOperationException();
final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments);
final ImmutableDruidServer server = new ImmutableDruidServer(
DUMMY_SERVER,
0L,
ImmutableMap.of("src", dataSource),
1
);
return ImmutableList.of(server);
}
@Override