Fix schema backfill count metric (#16536)

* Fix build

* Fix backfill metric

* Address review comment
This commit is contained in:
Rishabh Singh 2024-06-28 11:07:28 +05:30 committed by GitHub
parent b9c7664ac3
commit c96e783750
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 40 additions and 13 deletions

View File

@ -1153,7 +1153,6 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
return null;
}).list();
segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
});

View File

@ -179,7 +179,6 @@ public class SegmentSchemaBackFillQueue
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.metadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
@ -145,6 +146,12 @@ public class SegmentSchemaCache
public void updateFinalizedSegmentSchema(FinalizedSegmentSchemaInfo finalizedSegmentSchemaInfo)
{
this.finalizedSegmentSchemaInfo = finalizedSegmentSchemaInfo;
// remove metadata for segments which have been polled in the last database poll
temporaryPublishedMetadataQueryResults
.keySet()
.removeAll(finalizedSegmentSchemaInfo.getFinalizedSegmentMetadata().keySet());
setInitialized();
}
@ -185,14 +192,6 @@ public class SegmentSchemaCache
temporaryMetadataQueryResults.remove(segmentId);
}
/**
* temporaryPublishedMetadataQueryResults is reset after each DB poll.
*/
public void resetTemporaryPublishedMetadataQueryResultOnDBPoll()
{
temporaryPublishedMetadataQueryResults.clear();
}
/**
* Fetch schema for a given segment. Note, that there is no check on schema version in this method,
* since schema corresponding to a particular version {@link CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached.
@ -325,6 +324,12 @@ public class SegmentSchemaCache
);
}
@VisibleForTesting
SchemaPayloadPlus getTemporaryPublishedMetadataQueryResults(SegmentId id)
{
return temporaryPublishedMetadataQueryResults.get(id);
}
/**
* This class encapsulates schema information for segments polled from the DB.
*/

View File

@ -63,28 +63,52 @@ public class SegmentSchemaCacheTest
RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build();
SchemaPayloadPlus expected = new SchemaPayloadPlus(new SchemaPayload(rowSignature, Collections.emptyMap()), 20L);
SegmentId id = SegmentId.dummy("ds");
SegmentId id2 = SegmentId.dummy("ds2");
// this call shouldn't result in any error
cache.markMetadataQueryResultPublished(id);
cache.addTemporaryMetadataQueryResult(id, rowSignature, Collections.emptyMap(), 20);
cache.addTemporaryMetadataQueryResult(id2, rowSignature, Collections.emptyMap(), 20);
Assert.assertTrue(cache.isSchemaCached(id));
Assert.assertTrue(cache.isSchemaCached(id2));
Optional<SchemaPayloadPlus> schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
Optional<SchemaPayloadPlus> schema2 = cache.getSchemaForSegment(id);
Assert.assertTrue(schema2.isPresent());
Assert.assertEquals(expected, schema2.get());
cache.markMetadataQueryResultPublished(id);
cache.markMetadataQueryResultPublished(id2);
schema = cache.getSchemaForSegment(id);
Assert.assertTrue(schema.isPresent());
Assert.assertEquals(expected, schema.get());
cache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
// simulate call after segment polling
Assert.assertFalse(cache.isSchemaCached(id));
ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentMetadataBuilder = ImmutableMap.builder();
segmentMetadataBuilder.put(id, new SegmentMetadata(5L, "fp"));
ImmutableMap.Builder<String, SchemaPayload> schemaPayloadBuilder = ImmutableMap.builder();
schemaPayloadBuilder.put("fp", new SchemaPayload(rowSignature));
SegmentSchemaCache.FinalizedSegmentSchemaInfo finalizedSegmentSchemaInfo =
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentMetadataBuilder.build(), schemaPayloadBuilder.build());
cache.updateFinalizedSegmentSchema(finalizedSegmentSchemaInfo);
Assert.assertNull(cache.getTemporaryPublishedMetadataQueryResults(id));
Assert.assertNotNull(cache.getTemporaryPublishedMetadataQueryResults(id2));
Assert.assertTrue(cache.isSchemaCached(id));
Assert.assertTrue(cache.isSchemaCached(id2));
schema = cache.getSchemaForSegment(id);
Assert.assertFalse(schema.isPresent());
Assert.assertTrue(schema.isPresent());
schema2 = cache.getSchemaForSegment(id2);
Assert.assertTrue(schema2.isPresent());
}
@Test