From a6ebb963c726094f2acee651274176af12a5fe78 Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Thu, 9 May 2024 11:13:53 +0530 Subject: [PATCH] Fix NPE in SegmentSchemaCache (#16404) Verify that schema backfill count metric is emitted for each datasource. Fix potential NPE in SegmentSchemaCache#markMetadataQueryResultPublished. --- .../metadata/SegmentSchemaBackFillQueue.java | 2 +- .../segment/metadata/SegmentSchemaCache.java | 8 +-- .../SegmentSchemaBackFillQueueTest.java | 54 ++++++++++++++++--- .../metadata/SegmentSchemaCacheTest.java | 8 ++- 4 files changed, 60 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java index c2995e3087e..66ce9ed4bde 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java @@ -178,7 +178,7 @@ public class SegmentSchemaBackFillQueue segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); // Mark the segments as published in the cache. for (SegmentSchemaMetadataPlus plus : entry.getValue()) { - segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId()); + segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId()); } emitter.emit( ServiceMetricEvent.builder() diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java index e2fb1681792..c28e2b693bb 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java @@ -173,13 +173,15 @@ public class SegmentSchemaCache * After, metadata query result is published to the DB, it is removed from temporaryMetadataQueryResults * and added to temporaryPublishedMetadataQueryResults. */ - public void markInMetadataQueryResultPublished(SegmentId segmentId) + public void markMetadataQueryResultPublished(SegmentId segmentId) { - if (!temporaryMetadataQueryResults.containsKey(segmentId)) { + SchemaPayloadPlus temporaryMetadataQueryResult = temporaryMetadataQueryResults.get(segmentId); + if (temporaryMetadataQueryResult == null) { log.error("SegmentId [%s] not found in temporaryMetadataQueryResults map.", segmentId); + } else { + temporaryPublishedMetadataQueryResults.put(segmentId, temporaryMetadataQueryResult); } - temporaryPublishedMetadataQueryResults.put(segmentId, temporaryMetadataQueryResults.get(segmentId)); temporaryMetadataQueryResults.remove(segmentId); } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java index 238b5acc544..2229525c30a 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueueTest.java @@ -26,7 +26,9 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory; import org.apache.druid.segment.SchemaPayload; @@ -41,7 +43,9 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; public class SegmentSchemaBackFillQueueTest @@ -51,7 +55,8 @@ public class SegmentSchemaBackFillQueueTest } @Rule - public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig()); + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = + new TestDerbyConnector.DerbyConnectorRule(getEnabledConfig()); private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -78,13 +83,15 @@ public class SegmentSchemaBackFillQueueTest CountDownLatch latch = new CountDownLatch(1); + StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host"); + SegmentSchemaBackFillQueue segmentSchemaBackFillQueue = new SegmentSchemaBackFillQueue( segmentSchemaManager, ScheduledExecutors::fixed, segmentSchemaCache, new FingerprintGenerator(mapper), - new NoopServiceEmitter(), + emitter, config ) { @Override @@ -95,7 +102,7 @@ public class SegmentSchemaBackFillQueueTest } }; - final DataSegment segment = new DataSegment( + final DataSegment segment1 = new DataSegment( "foo", Intervals.of("2023-01-01/2023-01-02"), "2023-01-01", @@ -106,18 +113,53 @@ public class SegmentSchemaBackFillQueueTest 9, 100 ); - segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment), Collections.emptyMap()); + + final DataSegment segment2 = new DataSegment( + "foo", + Intervals.of("2023-01-02/2023-01-03"), + "2023-02-01", + ImmutableMap.of("path", "a-1"), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + + final DataSegment segment3 = new DataSegment( + "foo1", + Intervals.of("2023-01-01/2023-01-02"), + "2023-01-01", + ImmutableMap.of("path", "a-1"), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + Set segments = new HashSet<>(); + segments.add(segment1); + segments.add(segment2); + segments.add(segment3); + segmentSchemaTestUtils.insertUsedSegments(segments, Collections.emptyMap()); final Map> segmentIdSchemaMap = new HashMap<>(); RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build(); Map aggregatorFactoryMap = new HashMap<>(); aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null)); - segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20)); - segmentSchemaBackFillQueue.add(segment.getId(), rowSignature, aggregatorFactoryMap, 20); + segmentIdSchemaMap.put(segment1.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20)); + segmentIdSchemaMap.put(segment2.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20)); + segmentIdSchemaMap.put(segment3.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20)); + + segmentSchemaBackFillQueue.add(segment1.getId(), rowSignature, aggregatorFactoryMap, 20); + segmentSchemaBackFillQueue.add(segment2.getId(), rowSignature, aggregatorFactoryMap, 20); + segmentSchemaBackFillQueue.add(segment3.getId(), rowSignature, aggregatorFactoryMap, 20); segmentSchemaBackFillQueue.onLeaderStart(); latch.await(); segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap); + emitter.verifyValue("metadatacache/backfill/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 2); + emitter.verifyValue("metadatacache/backfill/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo1"), 1); } private CentralizedDatasourceSchemaConfig getEnabledConfig() diff --git a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java index 234b16bd9b5..f89c305b9db 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java @@ -56,13 +56,17 @@ public class SegmentSchemaCacheTest } @Test - public void testCacheInTransitSMQResult() + public void testCacheTemporaryMetadataQueryResults() { SegmentSchemaCache cache = new SegmentSchemaCache(new NoopServiceEmitter()); RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build(); SchemaPayloadPlus expected = new SchemaPayloadPlus(new SchemaPayload(rowSignature, Collections.emptyMap()), 20L); SegmentId id = SegmentId.dummy("ds"); + + // this call shouldn't result in any error + cache.markMetadataQueryResultPublished(id); + cache.addTemporaryMetadataQueryResult(id, rowSignature, Collections.emptyMap(), 20); Assert.assertTrue(cache.isSchemaCached(id)); @@ -70,7 +74,7 @@ public class SegmentSchemaCacheTest Assert.assertTrue(schema.isPresent()); Assert.assertEquals(expected, schema.get()); - cache.markInMetadataQueryResultPublished(id); + cache.markMetadataQueryResultPublished(id); schema = cache.getSchemaForSegment(id); Assert.assertTrue(schema.isPresent());