Skip tombstone segment refresh in metadata cache (#17025)

This PR #16890 introduced a change to skip adding tombstone segments to the cache.
It turns out that as a side effect tombstone segments appear unavailable in the console. This happens because availability of a segment in Broker is determined from the metadata cache.

The fix is to keep the segment in the metadata cache but skip them from refresh.

This doesn't affect any functionality as metadata query for tombstone returns empty causing continuous refresh of those segments.
This commit is contained in:
Rishabh Singh 2024-09-13 11:47:11 +05:30 committed by GitHub
parent fff3e81dcc
commit a8c06e93aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 246 additions and 134 deletions

View File

@ -102,6 +102,13 @@ import java.util.stream.StreamSupport;
* <p>
* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override
* with the logic to build and cache table schema.
* <p>
* Note on handling tombstone segments:
* These segments lack data or column information.
* Additionally, segment metadata queries, which are not yet implemented for tombstone segments
* (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
* leading to indefinite refresh attempts for these segments.
* Therefore, these segments are never added to the set of segments being refreshed.
*
* @param <T> The type of information associated with the data source, which must extend {@link DataSourceInformation}.
*/
@ -478,13 +485,6 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Skip adding tombstone segment to the cache. These segments lack data or column information.
// Additionally, segment metadata queries, which are not yet implemented for tombstone segments
// (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
// leading to indefinite refresh attempts for these segments.
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
@ -511,7 +511,11 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
markSegmentAsNeedRefresh(segment.getId());
if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment.");
} else {
markSegmentAsNeedRefresh(segment.getId());
}
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment [%s].", segment.getId());
markSegmentAsMutable(segment.getId());
@ -557,10 +561,6 @@ public abstract class AbstractSegmentMetadataCache<T extends DataSourceInformati
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
// tombstone segments are not present in the cache
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());

View File

@ -374,9 +374,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
return availableSegmentMetadata;
}
}
@ -403,9 +401,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
}
return availableSegmentMetadata;
}
@ -686,22 +682,14 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (SegmentId segmentId : segmentsMap.keySet()) {
for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : segmentsMap.entrySet()) {
SegmentId segmentId = entry.getKey();
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
// mark it for refresh only if it is used
// however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
}
markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
}
}
} else {
@ -876,4 +864,32 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
return Optional.empty();
}
}
/**
* A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh.
* It need not be refreshed in the following scenarios:
* - Tombstone segment, since they do not have any schema.
* - Unused segment which hasn't been yet removed from the cache.
* Any other scenario needs investigation.
*/
private void markSegmentForRefreshIfNeeded(DataSegment segment)
{
SegmentId id = segment.getId();
log.debug("SchemaMetadata for segmentId [%s] is absent.", id);
if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment [%s].", id);
return;
}
ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource());
if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
markSegmentAsNeedRefresh(id);
} else {
log.debug("Skipping refresh for unused segment [%s].", id);
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@ -2220,74 +2221,109 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
}
@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";
TestHelper.makeJsonMapper();
InternalQueryConfig internalQueryConfig = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
),
InternalQueryConfig.class
);
QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
factoryMock,
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};
schema.onLeaderStart();
schema.awaitInitialization();
DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);
Assert.assertEquals(6, schema.getTotalSegments());
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.PRIORITY_KEY, 5,
QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
);
serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());
DataSegment segment = newSegment("test", 0);
DataSegment tombstone = DataSegment.builder()
.dataSource("test")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(new TombstoneShardSpec())
.loadSpec(Collections.singletonMap(
"type",
DataSegment.TOMBSTONE_LOADSPEC_TYPE
))
.size(0)
.build();
Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);
serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
schema.addSegment(historicalServerMetadata, segment);
schema.addSegment(historicalServerMetadata, tombstone);
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());
SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(segment.getDataSource()),
new MultipleSpecificSegmentSpec(
segmentIterable.stream()
.filter(id -> !id.equals(tombstone.getId()))
.map(SegmentId::toDescriptor)
.collect(Collectors.toList())
),
new AllColumnIncluderator(),
false,
queryContext,
EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS),
false,
null,
null
);
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
.andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once();
EasyMock.replay(factoryMock, lifecycleMock);
schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test"));
// verify that metadata query is not issued for tombstone segment
EasyMock.verify(factoryMock, lifecycleMock);
// Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for tombstone segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());
// iterating over entire metadata doesn't cause tombstone to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}
@Test
@ -2384,6 +2420,27 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
AvailableSegmentMetadata availableSegmentMetadata =
schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId());
Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for unused segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
Assert.assertEquals(
1,
metadatas.stream()
.filter(
metadata ->
metadata.getSegment().getId().equals(segments.get(0).getId())).count()
);
// iterating over entire metadata doesn't cause unsed segment to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
}
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)

View File

@ -37,6 +37,7 @@ import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
@ -1139,71 +1140,109 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe
}
@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";
TestHelper.makeJsonMapper();
InternalQueryConfig internalQueryConfig = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
),
InternalQueryConfig.class
);
QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
factoryMock,
serverView,
BrokerSegmentMetadataCacheConfig.create(),
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
internalQueryConfig,
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};
schema.start();
schema.awaitInitialization();
DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);
Assert.assertEquals(6, schema.getTotalSegments());
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.PRIORITY_KEY, 5,
QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
);
serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());
DataSegment segment = newSegment("test", 0);
DataSegment tombstone = DataSegment.builder()
.dataSource("test")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(new TombstoneShardSpec())
.loadSpec(Collections.singletonMap(
"type",
DataSegment.TOMBSTONE_LOADSPEC_TYPE
))
.size(0)
.build();
Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
final ImmutableDruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);
serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
schema.addSegment(historicalServerMetadata, segment);
schema.addSegment(historicalServerMetadata, tombstone);
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());
SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(segment.getDataSource()),
new MultipleSpecificSegmentSpec(
segmentIterable.stream()
.filter(id -> !id.equals(tombstone.getId()))
.map(SegmentId::toDescriptor)
.collect(Collectors.toList())
),
new AllColumnIncluderator(),
false,
queryContext,
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
null,
null
);
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
.andReturn(QueryResponse.withEmptyContext(Sequences.empty()));
EasyMock.replay(factoryMock, lifecycleMock);
Set<SegmentId> segmentsToRefresh = new HashSet<>();
segmentsToRefresh.add(segment.getId());
schema.refresh(segmentsToRefresh, Collections.singleton("test"));
// verify that metadata is not issued for tombstone segment
EasyMock.verify(factoryMock, lifecycleMock);
// Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for tombstone segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());
// iterating over entire metadata doesn't cause tombstone to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}
}