mirror of https://github.com/apache/druid.git
Use PreparedBatch while deleting segments (#14639)
Related to #14634 Changes: - Update `IndexerSQLMetadataStorageCoordinator.deleteSegments` to use JDBI PreparedBatch instead of issuing single DELETE statements
This commit is contained in:
parent
efb32810c4
commit
54f29fedce
|
@ -1773,32 +1773,31 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
@Override
|
@Override
|
||||||
public void deleteSegments(final Set<DataSegment> segments)
|
public void deleteSegments(final Set<DataSegment> segments)
|
||||||
{
|
{
|
||||||
connector.getDBI().inTransaction(
|
if (segments.isEmpty()) {
|
||||||
new TransactionCallback<Void>()
|
log.info("No segments to delete.");
|
||||||
{
|
return;
|
||||||
@Override
|
}
|
||||||
public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
|
|
||||||
{
|
|
||||||
int segmentSize = segments.size();
|
|
||||||
String dataSource = "";
|
|
||||||
for (final DataSegment segment : segments) {
|
|
||||||
dataSource = segment.getDataSource();
|
|
||||||
deleteSegment(handle, segment);
|
|
||||||
}
|
|
||||||
log.debugSegments(segments, "Delete the metadata of segments");
|
|
||||||
log.info("Removed [%d] segments from metadata storage for dataSource [%s]!", segmentSize, dataSource);
|
|
||||||
|
|
||||||
return null;
|
final String deleteSql = StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable());
|
||||||
|
final String dataSource = segments.stream().findFirst().map(DataSegment::getDataSource).get();
|
||||||
|
|
||||||
|
// generate the IDs outside the transaction block
|
||||||
|
final List<String> ids = segments.stream().map(s -> s.getId().toString()).collect(Collectors.toList());
|
||||||
|
|
||||||
|
int numDeletedSegments = connector.getDBI().inTransaction((handle, transactionStatus) -> {
|
||||||
|
final PreparedBatch batch = handle.prepareBatch(deleteSql);
|
||||||
|
|
||||||
|
for (final String id : ids) {
|
||||||
|
batch.bind("id", id).add();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int[] deletedRows = batch.execute();
|
||||||
|
return Arrays.stream(deletedRows).sum();
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteSegment(final Handle handle, final DataSegment segment)
|
log.debugSegments(segments, "Delete the metadata of segments");
|
||||||
{
|
log.info("Deleted [%d] segments from metadata storage for dataSource [%s].", numDeletedSegments, dataSource);
|
||||||
handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()))
|
|
||||||
.bind("id", segment.getId().toString())
|
|
||||||
.execute();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
|
private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
|
||||||
|
|
|
@ -146,6 +146,18 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||||
100
|
100
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private final DataSegment defaultSegment2WithBiggerSize = new DataSegment(
|
||||||
|
"fooDataSource",
|
||||||
|
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
|
||||||
|
"version",
|
||||||
|
ImmutableMap.of(),
|
||||||
|
ImmutableList.of("dim1"),
|
||||||
|
ImmutableList.of("m1"),
|
||||||
|
new LinearShardSpec(1),
|
||||||
|
9,
|
||||||
|
200
|
||||||
|
);
|
||||||
|
|
||||||
private final DataSegment defaultSegment3 = new DataSegment(
|
private final DataSegment defaultSegment3 = new DataSegment(
|
||||||
"fooDataSource",
|
"fooDataSource",
|
||||||
Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
|
Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
|
||||||
|
@ -1413,6 +1425,46 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateSegmentsInMetaDataStorage() throws IOException
|
||||||
|
{
|
||||||
|
// Published segments to MetaDataStorage
|
||||||
|
coordinator.announceHistoricalSegments(SEGMENTS);
|
||||||
|
|
||||||
|
// check segments Published
|
||||||
|
Assert.assertEquals(
|
||||||
|
SEGMENTS,
|
||||||
|
ImmutableSet.copyOf(
|
||||||
|
coordinator.retrieveUsedSegmentsForInterval(
|
||||||
|
defaultSegment.getDataSource(),
|
||||||
|
defaultSegment.getInterval(),
|
||||||
|
Segments.ONLY_VISIBLE
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
// update single metadata item
|
||||||
|
coordinator.updateSegmentMetadata(Collections.singleton(defaultSegment2WithBiggerSize));
|
||||||
|
|
||||||
|
Collection<DataSegment> updated = coordinator.retrieveUsedSegmentsForInterval(
|
||||||
|
defaultSegment.getDataSource(),
|
||||||
|
defaultSegment.getInterval(),
|
||||||
|
Segments.ONLY_VISIBLE);
|
||||||
|
|
||||||
|
Assert.assertEquals(SEGMENTS.size(), updated.size());
|
||||||
|
|
||||||
|
DataSegment defaultAfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment)).findFirst().get();
|
||||||
|
DataSegment default2AfterUpdate = updated.stream().filter(s -> s.equals(defaultSegment2)).findFirst().get();
|
||||||
|
|
||||||
|
Assert.assertNotNull(defaultAfterUpdate);
|
||||||
|
Assert.assertNotNull(default2AfterUpdate);
|
||||||
|
|
||||||
|
// check that default did not change
|
||||||
|
Assert.assertEquals(defaultSegment.getSize(), defaultAfterUpdate.getSize());
|
||||||
|
// but that default 2 did change
|
||||||
|
Assert.assertEquals(defaultSegment2WithBiggerSize.getSize(), default2AfterUpdate.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws IOException
|
public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws IOException
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue