diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 49254c1da5c..bfc7784caf3 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -811,16 +811,14 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach mergedColumnTypes.put(column, columnType); } - Map columnMapping = segmentSchema.getColumnTypeMap(); + final Map columnMapping = segmentSchema.getColumnTypeMap(); // column type to be updated is not present in the existing schema - boolean missingUpdateColumns = false; - // new column to be added is already present in the existing schema - boolean existingNewColumns = false; + final Set missingUpdateColumns = new HashSet<>(); for (String column : segmentSchema.getUpdatedColumns()) { if (!mergedColumnTypes.containsKey(column)) { - missingUpdateColumns = true; + missingUpdateColumns.add(column); mergedColumnTypes.put(column, columnMapping.get(column)); } else { mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column))); @@ -829,23 +827,24 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach for (String column : segmentSchema.getNewColumns()) { if (mergedColumnTypes.containsKey(column)) { - existingNewColumns = true; mergedColumnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnMapping.get(column))); } else { mergedColumnTypes.put(column, columnMapping.get(column)); } } - if (missingUpdateColumns || existingNewColumns) { + if (!missingUpdateColumns.isEmpty()) { log.makeAlert( - "Error merging delta schema update with existing row signature. segmentId [%s], " - + "existingSignature [%s], deltaSchema [%s], missingUpdateColumns [%s], existingNewColumns [%s].", - segmentId, - existingSignature, - segmentSchema, - missingUpdateColumns, - existingNewColumns - ).emit(); + "Datasource schema mismatch detected. The delta realtime segment schema contains columns " + + "that are not defined in the datasource schema. " + + "This indicates a potential issue with schema updates on the Coordinator. " + + "Please review relevant Coordinator metrics and logs for task communication to identify any issues." + ) + .addData("datasource", segmentId.getDataSource()) + .addData("existingSignature", existingSignature) + .addData("deltaSchema", segmentSchema) + .addData("missingUpdateColumns", missingUpdateColumns) + .emit(); } mergedColumnTypes.forEach(builder::add);