From a8c06e93aafb3fce9ce9d62b657da78286b61307 Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Fri, 13 Sep 2024 11:47:11 +0530 Subject: [PATCH] Skip tombstone segment refresh in metadata cache (#17025) This PR #16890 introduced a change to skip adding tombstone segments to the cache. It turns out that as a side effect tombstone segments appear unavailable in the console. This happens because availability of a segment in Broker is determined from the metadata cache. The fix is to keep the segment in the metadata cache but skip them from refresh. This doesn't affect any functionality as metadata query for tombstone returns empty causing continuous refresh of those segments. --- .../AbstractSegmentMetadataCache.java | 24 +-- .../CoordinatorSegmentMetadataCache.java | 50 ++++-- .../CoordinatorSegmentMetadataCacheTest.java | 161 ++++++++++++------ .../BrokerSegmentMetadataCacheTest.java | 145 ++++++++++------ 4 files changed, 246 insertions(+), 134 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index d918ec5e3f2..99d965ec643 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -102,6 +102,13 @@ import java.util.stream.StreamSupport; *

* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override * with the logic to build and cache table schema. + *

+ * Note on handling tombstone segments: + * These segments lack data or column information. + * Additionally, segment metadata queries, which are not yet implemented for tombstone segments + * (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + * leading to indefinite refresh attempts for these segments. + * Therefore, these segments are never added to the set of segments being refreshed. * * @param The type of information associated with the data source, which must extend {@link DataSourceInformation}. */ @@ -478,13 +485,6 @@ public abstract class AbstractSegmentMetadataCache columnTypes = new LinkedHashMap<>(); if (segmentsMap != null && !segmentsMap.isEmpty()) { - for (SegmentId segmentId : segmentsMap.keySet()) { + for (Map.Entry entry : segmentsMap.entrySet()) { + SegmentId segmentId = entry.getKey(); Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId); if (optionalSchema.isPresent()) { RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); } else { - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); - - ImmutableDruidDataSource druidDataSource = - sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource()); - - if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { - // mark it for refresh only if it is used - // however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - } + markSegmentForRefreshIfNeeded(entry.getValue().getSegment()); } } } else { @@ -876,4 +864,32 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach return Optional.empty(); } } + + /** + * A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh. + * It need not be refreshed in the following scenarios: + * - Tombstone segment, since they do not have any schema. + * - Unused segment which hasn't been yet removed from the cache. + * Any other scenario needs investigation. + */ + private void markSegmentForRefreshIfNeeded(DataSegment segment) + { + SegmentId id = segment.getId(); + + log.debug("SchemaMetadata for segmentId [%s] is absent.", id); + + if (segment.isTombstone()) { + log.debug("Skipping refresh for tombstone segment [%s].", id); + return; + } + + ImmutableDruidDataSource druidDataSource = + sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource()); + + if (druidDataSource != null && druidDataSource.getSegment(id) != null) { + markSegmentAsNeedRefresh(id); + } else { + log.debug("Skipping refresh for unused segment [%s].", id); + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 0c099cb551c..22b0890e855 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -2220,74 +2221,109 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( - getQueryLifecycleFactory(walker), + factoryMock, serverView, SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), segmentSchemaCache, backFillQueue, sqlSegmentsMetadataManager, segmentsMetadataManagerConfigSupplier - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; - - schema.onLeaderStart(); - schema.awaitInitialization(); - - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, - null, - 0 ); - Assert.assertEquals(6, schema.getTotalSegments()); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + final DruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS), + false, + null, + null + ); + + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once(); + + EasyMock.replay(factoryMock, lifecycleMock); + + schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test")); + + // verify that metadata query is not issued for tombstone segment + EasyMock.verify(factoryMock, lifecycleMock); + + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } @Test @@ -2384,6 +2420,27 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId())); Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId())); + + AvailableSegmentMetadata availableSegmentMetadata = + schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId()); + + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for unused segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals( + 1, + metadatas.stream() + .filter( + metadata -> + metadata.getSegment().getId().equals(segments.get(0).getId())).count() + ); + + // iterating over entire metadata doesn't cause unsed segment to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index d9b24ed011d..b613c602f63 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -37,6 +37,7 @@ import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -1139,71 +1140,109 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + factoryMock, serverView, - BrokerSegmentMetadataCacheConfig.create(), + SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; - - schema.start(); - schema.awaitInitialization(); - - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, - null, - 0 ); - Assert.assertEquals(6, schema.getTotalSegments()); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + final ImmutableDruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + null, + null + ); + + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); + + EasyMock.replay(factoryMock, lifecycleMock); + + Set segmentsToRefresh = new HashSet<>(); + segmentsToRefresh.add(segment.getId()); + schema.refresh(segmentsToRefresh, Collections.singleton("test")); + + // verify that metadata is not issued for tombstone segment + EasyMock.verify(factoryMock, lifecycleMock); + + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } }