Fix NPE in SegmentSchemaCache (#16404)

Verify that schema backfill count metric is emitted for each datasource.
    Fix potential NPE in SegmentSchemaCache#markMetadataQueryResultPublished.
This commit is contained in:
Rishabh Singh 2024-05-09 11:13:53 +05:30 committed by GitHub
parent eb4e957db1
commit a6ebb963c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 60 additions and 12 deletions

View File

@ -178,7 +178,7 @@ public class SegmentSchemaBackFillQueue
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId());
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()

View File

@ -173,13 +173,15 @@ public class SegmentSchemaCache
* After, metadata query result is published to the DB, it is removed from temporaryMetadataQueryResults
* and added to temporaryPublishedMetadataQueryResults.
*/
public void markInMetadataQueryResultPublished(SegmentId segmentId)
public void markMetadataQueryResultPublished(SegmentId segmentId)
{
if (!temporaryMetadataQueryResults.containsKey(segmentId)) {
SchemaPayloadPlus temporaryMetadataQueryResult = temporaryMetadataQueryResults.get(segmentId);
if (temporaryMetadataQueryResult == null) {
log.error("SegmentId [%s] not found in temporaryMetadataQueryResults map.", segmentId);
} else {
temporaryPublishedMetadataQueryResults.put(segmentId, temporaryMetadataQueryResult);
}
temporaryPublishedMetadataQueryResults.put(segmentId, temporaryMetadataQueryResults.get(segmentId));
temporaryMetadataQueryResults.remove(segmentId);
}

View File

@ -26,7 +26,9 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
@ -41,7 +43,9 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
public class SegmentSchemaBackFillQueueTest
@ -51,7 +55,8 @@ public class SegmentSchemaBackFillQueueTest
}
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule =
new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig());
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@ -78,13 +83,15 @@ public class SegmentSchemaBackFillQueueTest
CountDownLatch latch = new CountDownLatch(1);
StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
SegmentSchemaBackFillQueue segmentSchemaBackFillQueue =
new SegmentSchemaBackFillQueue(
segmentSchemaManager,
ScheduledExecutors::fixed,
segmentSchemaCache,
new FingerprintGenerator(mapper),
new NoopServiceEmitter(),
emitter,
config
) {
@Override
@ -95,7 +102,7 @@ public class SegmentSchemaBackFillQueueTest
}
};
final DataSegment segment = new DataSegment(
final DataSegment segment1 = new DataSegment(
"foo",
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-01",
@ -106,18 +113,53 @@ public class SegmentSchemaBackFillQueueTest
9,
100
);
segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment), Collections.emptyMap());
final DataSegment segment2 = new DataSegment(
"foo",
Intervals.of("2023-01-02/2023-01-03"),
"2023-02-01",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
final DataSegment segment3 = new DataSegment(
"foo1",
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-01",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
Set<DataSegment> segments = new HashSet<>();
segments.add(segment1);
segments.add(segment2);
segments.add(segment3);
segmentSchemaTestUtils.insertUsedSegments(segments, Collections.emptyMap());
final Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new HashMap<>();
RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
segmentSchemaBackFillQueue.add(segment.getId(), rowSignature, aggregatorFactoryMap, 20);
segmentIdSchemaMap.put(segment1.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
segmentIdSchemaMap.put(segment2.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
segmentIdSchemaMap.put(segment3.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
segmentSchemaBackFillQueue.add(segment1.getId(), rowSignature, aggregatorFactoryMap, 20);
segmentSchemaBackFillQueue.add(segment2.getId(), rowSignature, aggregatorFactoryMap, 20);
segmentSchemaBackFillQueue.add(segment3.getId(), rowSignature, aggregatorFactoryMap, 20);
segmentSchemaBackFillQueue.onLeaderStart();
latch.await();
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
emitter.verifyValue("metadatacache/backfill/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 2);
emitter.verifyValue("metadatacache/backfill/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo1"), 1);
}
private CentralizedDatasourceSchemaConfig getEnabledConfig()

View File

@ -56,13 +56,17 @@ public class SegmentSchemaCacheTest
}
@Test
public void testCacheInTransitSMQResult()
public void testCacheTemporaryMetadataQueryResults()
{
SegmentSchemaCache cache = new SegmentSchemaCache(new NoopServiceEmitter());
RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build();
SchemaPayloadPlus expected = new SchemaPayloadPlus(new SchemaPayload(rowSignature, Collections.emptyMap()), 20L);
SegmentId id = SegmentId.dummy("ds");
// this call shouldn't result in any error
cache.markMetadataQueryResultPublished(id);
cache.addTemporaryMetadataQueryResult(id, rowSignature, Collections.emptyMap(), 20);
Assert.assertTrue(cache.isSchemaCached(id));
@ -70,7 +74,7 @@ public class SegmentSchemaCacheTest
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
cache.markInMetadataQueryResultPublished(id);
cache.markMetadataQueryResultPublished(id);
schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());