From 42e99bf91202ee3505f277bd2eb59d473506ad00 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Tue, 30 Apr 2024 15:48:16 +0530 Subject: [PATCH] Add new index on datasource and task_allocator_id for pending segments (#16355) * Add pending segments index on datasource and task_allocator_id * Use both datasource and task_allocator_id in queries --- .../druid/indexing/overlord/TaskLockbox.java | 5 ++++- .../indexing/overlord/TaskLockboxTest.java | 6 ++++-- ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 3 ++- .../IndexerSQLMetadataStorageCoordinator.java | 7 ++++--- .../druid/metadata/SQLMetadataConnector.java | 21 +++++++++++++++++-- 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 5d71940d470..1a1369ba1e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1242,7 +1242,10 @@ public class TaskLockbox idsInSameGroup.remove(task.getId()); if (idsInSameGroup.isEmpty()) { final int pendingSegmentsDeleted - = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId); + = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId( + task.getDataSource(), + taskAllocatorId + ); log.info( "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", pendingSegmentsDeleted, taskAllocatorId diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 3af74235e82..3c95709a9fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1948,8 +1948,10 @@ public class TaskLockboxTest // Only the replaceTask should attempt a delete on the upgradeSegments table EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); // Any task may attempt pending segment clean up - EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once(); - EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(), replaceTask.getId())) + .andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(), appendTask.getId())) + .andReturn(0).once(); EasyMock.replay(coordinator); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 6c4f556133e..3aceae494c6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } @Override - public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index aea2674f6b8..da6dd9ffd95 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -492,10 +492,11 @@ public interface IndexerMetadataStorageCoordinator /** * Delete pending segment for a give task group after all the tasks belonging to it have completed. + * @param datasource datasource of the task * @param taskAllocatorId task id / task group / replica group for an appending task * @return number of pending segments deleted from the metadata store */ - int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId); + int deletePendingSegmentsForTaskAllocatorId(String datasource, String taskAllocatorId); /** * Fetches all the pending segments of the datasource that overlap with a given interval. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c5a36656c9a..618173a5db7 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2813,17 +2813,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } @Override - public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId) { return connector.getDBI().inTransaction( (handle, status) -> handle .createStatement( StringUtils.format( - "DELETE FROM %s WHERE task_allocator_id = :task_allocator_id", + "DELETE FROM %s WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id", dbTables.getPendingSegmentsTable() ) ) - .bind("task_allocator_id", pendingSegmentsGroup) + .bind("dataSource", datasource) + .bind("task_allocator_id", taskAllocatorId) .execute() ); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 94452de00ee..cb85548d5ba 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -294,7 +294,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector ) ) ); - alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName); + alterPendingSegmentsTable(tableName); } public void createDataSourceTable(final String tableName) @@ -481,7 +481,16 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector } } - private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName) + /** + * Adds the following columns to the pending segments table to clean up unused records, + * and to faciliatate concurrent append and replace. + * 1) task_allocator_id -> The task id / task group id / task replica group id of the task that allocated it. + * 2) upgraded_from_segment_id -> The id of the segment from which the entry was upgraded upon concurrent replace. + * + * Also, adds an index on (dataSource, task_allocator_id) + * @param tableName name of the pending segments table + */ + private void alterPendingSegmentsTable(final String tableName) { List statements = new ArrayList<>(); if (tableHasColumn(tableName, "upgraded_from_segment_id")) { @@ -499,6 +508,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector if (!statements.isEmpty()) { alterTable(tableName, statements); } + + final Set createdIndexSet = getIndexOnTable(tableName); + createIndex( + tableName, + StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName), + ImmutableList.of("dataSource", "task_allocator_id"), + createdIndexSet + ); } public void createLogTable(final String tableName, final String entryTypeName)