update insert pending segments logic to synchronous (#6336)

* 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict.
2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB.
3. 'select and insert' is not need within transaction.

* Use TaskLockbox.doInCriticalSection instead of synchronized syntax to speed up insert pending segments.

* fix typo for NullPointerException
This commit is contained in:
Faxian Zhao 2018-10-23 10:48:20 +08:00 committed by Jihoon Son
parent 359576a80b
commit c5bf4e7503
3 changed files with 33 additions and 15 deletions

View File

@ -23,9 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.java.util.common.IAE;
@ -267,14 +269,31 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
}
if (lockResult.isOk()) {
final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
);
final SegmentIdentifier identifier;
try {
identifier = toolbox.getTaskLockbox().doInCriticalSection(
task,
ImmutableList.of(tryInterval),
CriticalAction.<SegmentIdentifier>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
)
).onInvalidLocks(
() -> null
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
if (identifier != null) {
return identifier;
} else {

View File

@ -387,11 +387,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
return connector.retryWithHandle(
new HandleCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
public SegmentIdentifier withHandle(Handle handle) throws Exception
{
return skipSegmentLineageCheck ?
allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) :
@ -404,9 +404,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
maxVersion
);
}
},
ALLOCATE_SEGMENT_QUIET_TRIES,
SQLMetadataConnector.DEFAULT_MAX_TRIES
}
);
}

View File

@ -33,6 +33,7 @@ import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionIsolationLevel;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
@ -144,7 +145,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public <T> T retryTransaction(final TransactionCallback<T> callback, final int quietTries, final int maxTries)
{
try {
return RetryUtils.retry(() -> getDBI().inTransaction(callback), shouldRetry, quietTries, maxTries);
return RetryUtils.retry(() -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), shouldRetry, quietTries, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);