fix DruidSchema issue where datasources with no segments can become stuck in tables list indefinitely (#12727)

This commit is contained in:
Clint Wylie 2022-07-01 18:54:01 -07:00 committed by GitHub
parent f5b5cb93ea
commit bbbb6e1c3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 2 deletions

View File

@ -84,7 +84,8 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
ImmutableList.of() ImmutableList.of()
); );
} }
catch (Exception ignored) { catch (Exception e) {
LOG.error(e, "Failed to post load rules");
} }
}); });
@ -127,6 +128,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE) replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE)
); );
} }
finally { finally {
closer.close(); closer.close();

View File

@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumSet; import java.util.EnumSet;
@ -402,6 +403,11 @@ public class DruidSchema extends AbstractSchema
// Rebuild the dataSources. // Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) { for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource); final DruidTable druidTable = buildDruidTable(dataSource);
if (druidTable == null) {
log.info("dataSource[%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}
final DruidTable oldTable = tables.put(dataSource, druidTable); final DruidTable oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource"; final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
@ -773,12 +779,13 @@ public class DruidSchema extends AbstractSchema
} }
@VisibleForTesting @VisibleForTesting
@Nullable
DruidTable buildDruidTable(final String dataSource) DruidTable buildDruidTable(final String dataSource)
{ {
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource); ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
final Map<String, ColumnType> columnTypes = new TreeMap<>(); final Map<String, ColumnType> columnTypes = new TreeMap<>();
if (segmentsMap != null) { if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) { for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature(); final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) { if (rowSignature != null) {
@ -792,6 +799,9 @@ public class DruidSchema extends AbstractSchema
} }
} }
} }
} else {
// table has no segments
return null;
} }
final RowSignature.Builder builder = RowSignature.builder(); final RowSignature.Builder builder = RowSignature.builder();

View File

@ -77,6 +77,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -1199,6 +1200,17 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
); );
} }
@Test
public void testStaleDatasourceRefresh() throws IOException
{
Set<SegmentId> segments = new HashSet<>();
Set<String> datasources = new HashSet<>();
datasources.add("wat");
Assert.assertNull(schema.getTable("wat"));
schema.refresh(segments, datasources);
Assert.assertNull(schema.getTable("wat"));
}
private static DataSegment newSegment(String datasource, int partitionId) private static DataSegment newSegment(String datasource, int partitionId)
{ {
return new DataSegment( return new DataSegment(