mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Retry on dataSource metadata CAS failures where retrying might help. (#3728)
This retries when the start condition is met but SELECT -> INSERT/UPDATE fails, which indicates a race. If the start condition isn't met, there won't be any retrying.
This commit is contained in:
parent
27ab23ef44
commit
a8069f2441
@ -32,7 +32,6 @@ import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.indexing.overlord.DataSourceMetadata;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
@ -96,6 +95,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
this.connector = connector;
|
||||
}
|
||||
|
||||
enum DataSourceMetadataUpdateResult
|
||||
{
|
||||
SUCCESS,
|
||||
FAILURE,
|
||||
TRY_AGAIN
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
@ -330,17 +336,22 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
final Set<DataSegment> inserted = Sets.newHashSet();
|
||||
|
||||
if (startMetadata != null) {
|
||||
final boolean success = updateDataSourceMetadataWithHandle(
|
||||
final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
|
||||
handle,
|
||||
dataSource,
|
||||
startMetadata,
|
||||
endMetadata
|
||||
);
|
||||
|
||||
if (!success) {
|
||||
if (result != DataSourceMetadataUpdateResult.SUCCESS) {
|
||||
transactionStatus.setRollbackOnly();
|
||||
txnFailure.set(true);
|
||||
throw new RuntimeException("Aborting transaction!");
|
||||
|
||||
if (result == DataSourceMetadataUpdateResult.FAILURE) {
|
||||
throw new RuntimeException("Aborting transaction!");
|
||||
} else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
|
||||
throw new RetryTransactionException("Aborting transaction!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -698,7 +709,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
*
|
||||
* @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
|
||||
*/
|
||||
private boolean updateDataSourceMetadataWithHandle(
|
||||
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
|
||||
final Handle handle,
|
||||
final String dataSource,
|
||||
final DataSourceMetadata startMetadata,
|
||||
@ -730,7 +741,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
if (!startMetadataMatchesExisting) {
|
||||
// Not in the desired start state.
|
||||
log.info("Not updating metadata, existing state is not the expected start state.");
|
||||
return false;
|
||||
return DataSourceMetadataUpdateResult.FAILURE;
|
||||
}
|
||||
|
||||
final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null
|
||||
@ -741,7 +752,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
|
||||
);
|
||||
|
||||
final boolean retVal;
|
||||
final DataSourceMetadataUpdateResult retVal;
|
||||
if (oldCommitMetadataBytesFromDb == null) {
|
||||
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
|
||||
final int numRows = handle.createStatement(
|
||||
@ -757,7 +768,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
.bind("commit_metadata_sha1", newCommitMetadataSha1)
|
||||
.execute();
|
||||
|
||||
retVal = numRows > 0;
|
||||
retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
|
||||
} else {
|
||||
// Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE
|
||||
final int numRows = handle.createStatement(
|
||||
@ -775,10 +786,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
|
||||
.execute();
|
||||
|
||||
retVal = numRows > 0;
|
||||
retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
|
||||
}
|
||||
|
||||
if (retVal) {
|
||||
if (retVal == DataSourceMetadataUpdateResult.SUCCESS) {
|
||||
log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata);
|
||||
} else {
|
||||
log.info("Not updating metadata, compare-and-swap failure.");
|
||||
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.metadata;
|
||||
|
||||
/**
|
||||
* Exception thrown by SQL connector code when it wants a transaction to be retried. This exception is checked for
|
||||
* by {@link SQLMetadataConnector#isTransientException(Throwable)}.
|
||||
*/
|
||||
public class RetryTransactionException extends RuntimeException
|
||||
{
|
||||
public RetryTransactionException(String message)
|
||||
{
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -23,11 +23,9 @@ import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.RetryUtils;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.skife.jdbi.v2.Batch;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
@ -159,7 +157,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||
|
||||
public final boolean isTransientException(Throwable e)
|
||||
{
|
||||
return e != null && (e instanceof SQLTransientException
|
||||
return e != null && (e instanceof RetryTransactionException
|
||||
|| e instanceof SQLTransientException
|
||||
|| e instanceof SQLRecoverableException
|
||||
|| e instanceof UnableToObtainConnectionException
|
||||
|| e instanceof UnableToExecuteStatementException
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.druid.indexing.overlord.DataSourceMetadata;
|
||||
import io.druid.indexing.overlord.ObjectMetadata;
|
||||
import io.druid.indexing.overlord.SegmentPublishResult;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
@ -41,6 +42,7 @@ import org.skife.jdbi.v2.util.StringMapper;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
{
|
||||
@ -98,7 +100,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
);
|
||||
|
||||
private final Set<DataSegment> SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2);
|
||||
IndexerSQLMetadataStorageCoordinator coordinator;
|
||||
private final AtomicLong metadataUpdateCounter = new AtomicLong();
|
||||
private IndexerSQLMetadataStorageCoordinator coordinator;
|
||||
private TestDerbyConnector derbyConnector;
|
||||
|
||||
@Before
|
||||
@ -109,11 +112,26 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
derbyConnector.createDataSourceTable();
|
||||
derbyConnector.createTaskTables();
|
||||
derbyConnector.createSegmentTable();
|
||||
metadataUpdateCounter.set(0);
|
||||
coordinator = new IndexerSQLMetadataStorageCoordinator(
|
||||
mapper,
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get(),
|
||||
derbyConnector
|
||||
);
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
|
||||
Handle handle,
|
||||
String dataSource,
|
||||
DataSourceMetadata startMetadata,
|
||||
DataSourceMetadata endMetadata
|
||||
) throws IOException
|
||||
{
|
||||
// Count number of times this method is called.
|
||||
metadataUpdateCounter.getAndIncrement();
|
||||
return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void unUseSegment()
|
||||
@ -176,6 +194,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()),
|
||||
getUsedIdentifiers()
|
||||
);
|
||||
|
||||
// Should not update dataSource metadata.
|
||||
Assert.assertEquals(0, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -244,6 +265,86 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz")),
|
||||
coordinator.getDataSourceMetadata("fooDataSource")
|
||||
);
|
||||
|
||||
// Should only be tried once per call.
|
||||
Assert.assertEquals(2, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransactionalAnnounceRetryAndSuccess() throws IOException
|
||||
{
|
||||
final AtomicLong attemptCounter = new AtomicLong();
|
||||
|
||||
final IndexerSQLMetadataStorageCoordinator failOnceCoordinator = new IndexerSQLMetadataStorageCoordinator(
|
||||
mapper,
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get(),
|
||||
derbyConnector
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
|
||||
Handle handle,
|
||||
String dataSource,
|
||||
DataSourceMetadata startMetadata,
|
||||
DataSourceMetadata endMetadata
|
||||
) throws IOException
|
||||
{
|
||||
metadataUpdateCounter.getAndIncrement();
|
||||
if (attemptCounter.getAndIncrement() == 0) {
|
||||
return DataSourceMetadataUpdateResult.TRY_AGAIN;
|
||||
} else {
|
||||
return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Insert first segment.
|
||||
final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments(
|
||||
ImmutableSet.of(defaultSegment),
|
||||
new ObjectMetadata(null),
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
|
||||
);
|
||||
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
|
||||
|
||||
Assert.assertArrayEquals(
|
||||
mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"),
|
||||
derbyConnector.lookup(
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
|
||||
"id",
|
||||
"payload",
|
||||
defaultSegment.getIdentifier()
|
||||
)
|
||||
);
|
||||
|
||||
// Reset attempt counter to induce another failure.
|
||||
attemptCounter.set(0);
|
||||
|
||||
// Insert second segment.
|
||||
final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments(
|
||||
ImmutableSet.of(defaultSegment2),
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
|
||||
);
|
||||
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
|
||||
|
||||
Assert.assertArrayEquals(
|
||||
mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"),
|
||||
derbyConnector.lookup(
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
|
||||
"id",
|
||||
"payload",
|
||||
defaultSegment2.getIdentifier()
|
||||
)
|
||||
);
|
||||
|
||||
// Examine metadata.
|
||||
Assert.assertEquals(
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz")),
|
||||
failOnceCoordinator.getDataSourceMetadata("fooDataSource")
|
||||
);
|
||||
|
||||
// Should be tried twice per call.
|
||||
Assert.assertEquals(4, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -255,6 +356,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
|
||||
);
|
||||
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result1);
|
||||
|
||||
// Should only be tried once.
|
||||
Assert.assertEquals(1, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -273,6 +377,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
|
||||
);
|
||||
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result2);
|
||||
|
||||
// Should only be tried once per call.
|
||||
Assert.assertEquals(2, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -291,6 +398,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
|
||||
);
|
||||
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result2);
|
||||
|
||||
// Should only be tried once per call.
|
||||
Assert.assertEquals(2, metadataUpdateCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
x
Reference in New Issue
Block a user