mirror of https://github.com/apache/druid.git
SQLMetadataConnector: Fix overzealous retries that were preventing EntryExistsException from making it out.
This commit is contained in:
parent
c82b680cd4
commit
102fc92120
|
@ -100,7 +100,10 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||||
|
|
||||||
public abstract boolean tableExists(Handle handle, final String tableName);
|
public abstract boolean tableExists(Handle handle, final String tableName);
|
||||||
|
|
||||||
public <T> T retryWithHandle(final HandleCallback<T> callback)
|
public <T> T retryWithHandle(
|
||||||
|
final HandleCallback<T> callback,
|
||||||
|
final Predicate<Throwable> myShouldRetry
|
||||||
|
)
|
||||||
{
|
{
|
||||||
final Callable<T> call = new Callable<T>()
|
final Callable<T> call = new Callable<T>()
|
||||||
{
|
{
|
||||||
|
@ -112,13 +115,18 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||||
};
|
};
|
||||||
final int maxTries = 10;
|
final int maxTries = 10;
|
||||||
try {
|
try {
|
||||||
return RetryUtils.retry(call, shouldRetry, maxTries);
|
return RetryUtils.retry(call, myShouldRetry, maxTries);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T> T retryWithHandle(final HandleCallback<T> callback)
|
||||||
|
{
|
||||||
|
return retryWithHandle(callback, shouldRetry);
|
||||||
|
}
|
||||||
|
|
||||||
public <T> T retryTransaction(final TransactionCallback<T> callback)
|
public <T> T retryTransaction(final TransactionCallback<T> callback)
|
||||||
{
|
{
|
||||||
final Callable<T> call = new Callable<T>()
|
final Callable<T> call = new Callable<T>()
|
||||||
|
@ -399,21 +407,24 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createRulesTable() {
|
public void createRulesTable()
|
||||||
|
{
|
||||||
if (config.get().isCreateTables()) {
|
if (config.get().isCreateTables()) {
|
||||||
createRulesTable(tablesConfigSupplier.get().getRulesTable());
|
createRulesTable(tablesConfigSupplier.get().getRulesTable());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createConfigTable() {
|
public void createConfigTable()
|
||||||
|
{
|
||||||
if (config.get().isCreateTables()) {
|
if (config.get().isCreateTables()) {
|
||||||
createConfigTable(tablesConfigSupplier.get().getConfigTable());
|
createConfigTable(tablesConfigSupplier.get().getConfigTable());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createTaskTables() {
|
public void createTaskTables()
|
||||||
|
{
|
||||||
if (config.get().isCreateTables()) {
|
if (config.get().isCreateTables()) {
|
||||||
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
|
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
|
||||||
final String entryType = tablesConfig.getTaskEntryType();
|
final String entryType = tablesConfig.getTaskEntryType();
|
||||||
|
@ -502,7 +513,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void createAuditTable() {
|
public void createAuditTable()
|
||||||
|
{
|
||||||
if (config.get().isCreateTables()) {
|
if (config.get().isCreateTables()) {
|
||||||
createAuditTable(tablesConfigSupplier.get().getAuditTable());
|
createAuditTable(tablesConfigSupplier.get().getAuditTable());
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.metadata;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -115,6 +116,17 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
|
||||||
.execute();
|
.execute();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
new Predicate<Throwable>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean apply(Throwable e)
|
||||||
|
{
|
||||||
|
final boolean isStatementException = e instanceof StatementException ||
|
||||||
|
(e instanceof CallbackFailedException
|
||||||
|
&& e.getCause() instanceof StatementException);
|
||||||
|
return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -41,6 +42,9 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
@Rule
|
@Rule
|
||||||
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>> handler;
|
private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>> handler;
|
||||||
|
|
||||||
|
@ -176,6 +180,19 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10_000L)
|
||||||
|
public void testRepeatInsert() throws Exception
|
||||||
|
{
|
||||||
|
final String entryId = "abcd";
|
||||||
|
Map<String, Integer> entry = ImmutableMap.of("a", 1);
|
||||||
|
Map<String, Integer> status = ImmutableMap.of("count", 42);
|
||||||
|
|
||||||
|
handler.insert(entryId, new DateTime("2014-01-01"), "test", entry, true, status);
|
||||||
|
|
||||||
|
thrown.expect(EntryExistsException.class);
|
||||||
|
handler.insert(entryId, new DateTime("2014-01-01"), "test", entry, true, status);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLogs() throws Exception
|
public void testLogs() throws Exception
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue