mirror of https://github.com/apache/druid.git
Add new index on datasource and task_allocator_id for pending segments (#16355)
* Add pending segments index on datasource and task_allocator_id * Use both datasource and task_allocator_id in queries
This commit is contained in:
parent
e695e52d3f
commit
42e99bf912
|
@ -1242,7 +1242,10 @@ public class TaskLockbox
|
|||
idsInSameGroup.remove(task.getId());
|
||||
if (idsInSameGroup.isEmpty()) {
|
||||
final int pendingSegmentsDeleted
|
||||
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
|
||||
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
|
||||
task.getDataSource(),
|
||||
taskAllocatorId
|
||||
);
|
||||
log.info(
|
||||
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
|
||||
pendingSegmentsDeleted, taskAllocatorId
|
||||
|
|
|
@ -1948,8 +1948,10 @@ public class TaskLockboxTest
|
|||
// Only the replaceTask should attempt a delete on the upgradeSegments table
|
||||
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
|
||||
// Any task may attempt pending segment clean up
|
||||
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
|
||||
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
|
||||
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(), replaceTask.getId()))
|
||||
.andReturn(0).once();
|
||||
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(), appendTask.getId()))
|
||||
.andReturn(0).once();
|
||||
EasyMock.replay(coordinator);
|
||||
|
||||
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
|
||||
|
|
|
@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
|
|||
}
|
||||
|
||||
@Override
|
||||
public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
|
||||
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
|
|
@ -492,10 +492,11 @@ public interface IndexerMetadataStorageCoordinator
|
|||
|
||||
/**
|
||||
* Delete pending segment for a give task group after all the tasks belonging to it have completed.
|
||||
* @param datasource datasource of the task
|
||||
* @param taskAllocatorId task id / task group / replica group for an appending task
|
||||
* @return number of pending segments deleted from the metadata store
|
||||
*/
|
||||
int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
|
||||
int deletePendingSegmentsForTaskAllocatorId(String datasource, String taskAllocatorId);
|
||||
|
||||
/**
|
||||
* Fetches all the pending segments of the datasource that overlap with a given interval.
|
||||
|
|
|
@ -2813,17 +2813,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
}
|
||||
|
||||
@Override
|
||||
public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup)
|
||||
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
|
||||
{
|
||||
return connector.getDBI().inTransaction(
|
||||
(handle, status) -> handle
|
||||
.createStatement(
|
||||
StringUtils.format(
|
||||
"DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
|
||||
"DELETE FROM %s WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id",
|
||||
dbTables.getPendingSegmentsTable()
|
||||
)
|
||||
)
|
||||
.bind("task_allocator_id", pendingSegmentsGroup)
|
||||
.bind("dataSource", datasource)
|
||||
.bind("task_allocator_id", taskAllocatorId)
|
||||
.execute()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -294,7 +294,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
)
|
||||
)
|
||||
);
|
||||
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
|
||||
alterPendingSegmentsTable(tableName);
|
||||
}
|
||||
|
||||
public void createDataSourceTable(final String tableName)
|
||||
|
@ -481,7 +481,16 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
}
|
||||
}
|
||||
|
||||
private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
|
||||
/**
|
||||
* Adds the following columns to the pending segments table to clean up unused records,
|
||||
* and to faciliatate concurrent append and replace.
|
||||
* 1) task_allocator_id -> The task id / task group id / task replica group id of the task that allocated it.
|
||||
* 2) upgraded_from_segment_id -> The id of the segment from which the entry was upgraded upon concurrent replace.
|
||||
*
|
||||
* Also, adds an index on (dataSource, task_allocator_id)
|
||||
* @param tableName name of the pending segments table
|
||||
*/
|
||||
private void alterPendingSegmentsTable(final String tableName)
|
||||
{
|
||||
List<String> statements = new ArrayList<>();
|
||||
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
|
||||
|
@ -499,6 +508,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
if (!statements.isEmpty()) {
|
||||
alterTable(tableName, statements);
|
||||
}
|
||||
|
||||
final Set<String> createdIndexSet = getIndexOnTable(tableName);
|
||||
createIndex(
|
||||
tableName,
|
||||
StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName),
|
||||
ImmutableList.of("dataSource", "task_allocator_id"),
|
||||
createdIndexSet
|
||||
);
|
||||
}
|
||||
|
||||
public void createLogTable(final String tableName, final String entryTypeName)
|
||||
|
|
Loading…
Reference in New Issue