Remove alert for pre-existing new columns while merging realtime schema (#16989)

Currently, an alert is thrown while merging datasource schema with realtime
segment schema when the datasource schema already has update columns from the delta schema.

This isn't an error condition since the datasource schema can have those columns from a different segment.

One scenario in which this can occur is when multiple replicas for a task is run.
This commit is contained in:
Rishabh Singh 2024-09-05 07:58:24 +05:30 committed by GitHub
parent 9162339fa8
commit 4e02e5b856
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 14 additions and 15 deletions

View File

@ -811,16 +811,14 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
mergedColumnTypes.put(column, columnType);
}
Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
final Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
// column type to be updated is not present in the existing schema
boolean missingUpdateColumns = false;
// new column to be added is already present in the existing schema
boolean existingNewColumns = false;
final Set<String> missingUpdateColumns = new HashSet<>();
for (String column : segmentSchema.getUpdatedColumns()) {
if (!mergedColumnTypes.containsKey(column)) {
missingUpdateColumns = true;
missingUpdateColumns.add(column);
mergedColumnTypes.put(column, columnMapping.get(column));
} else {
mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column)));
@ -829,23 +827,24 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
for (String column : segmentSchema.getNewColumns()) {
if (mergedColumnTypes.containsKey(column)) {
existingNewColumns = true;
mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column)));
} else {
mergedColumnTypes.put(column, columnMapping.get(column));
}
}
if (missingUpdateColumns || existingNewColumns) {
if (!missingUpdateColumns.isEmpty()) {
log.makeAlert(
"Error merging delta schema update with existing row signature. segmentId [%s], "
+ "existingSignature [%s], deltaSchema [%s], missingUpdateColumns [%s], existingNewColumns [%s].",
segmentId,
existingSignature,
segmentSchema,
missingUpdateColumns,
existingNewColumns
).emit();
"Datasource schema mismatch detected. The delta realtime segment schema contains columns "
+ "that are not defined in the datasource schema. "
+ "This indicates a potential issue with schema updates on the Coordinator. "
+ "Please review relevant Coordinator metrics and logs for task communication to identify any issues."
)
.addData("datasource", segmentId.getDataSource())
.addData("existingSignature", existingSignature)
.addData("deltaSchema", segmentSchema)
.addData("missingUpdateColumns", missingUpdateColumns)
.emit();
}
mergedColumnTypes.forEach(builder::add);