diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 26c7bb74f1d..98ad25d7419 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.java.util.common.IAE; @@ -267,14 +269,31 @@ public class SegmentAllocateAction implements TaskAction } if (lockResult.isOk()) { - final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( - dataSource, - sequenceName, - previousSegmentId, - tryInterval, - lockResult.getTaskLock().getVersion(), - skipSegmentLineageCheck - ); + final SegmentIdentifier identifier; + try { + identifier = toolbox.getTaskLockbox().doInCriticalSection( + task, + ImmutableList.of(tryInterval), + CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + lockResult.getTaskLock().getVersion(), + skipSegmentLineageCheck + ) + ).onInvalidLocks( + () -> null + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + if (identifier != null) { return identifier; } else { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index d021e4b1f26..ccd23733513 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -387,11 +387,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "maxVersion"); - return connector.retryTransaction( - new TransactionCallback() + return connector.retryWithHandle( + new HandleCallback() { @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + public SegmentIdentifier withHandle(Handle handle) throws Exception { return skipSegmentLineageCheck ? allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) : @@ -404,9 +404,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor maxVersion ); } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + } ); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2d8e3e5ee6e..ddd33d1b159 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -33,6 +33,7 @@ import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.TransactionCallback; +import org.skife.jdbi.v2.TransactionIsolationLevel; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; @@ -144,7 +145,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector public T retryTransaction(final TransactionCallback callback, final int quietTries, final int maxTries) { try { - return RetryUtils.retry(() -> getDBI().inTransaction(callback), shouldRetry, quietTries, maxTries); + return RetryUtils.retry(() -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), shouldRetry, quietTries, maxTries); } catch (Exception e) { throw Throwables.propagate(e);