Merge pull request #1665 from gianm/consolidate-sql-retrying

Consolidate SQL retrying by moving logic into the connectors.
This commit is contained in:
Fangjin Yang 2015-08-25 12:43:48 -07:00
commit 2c8b3901be
6 changed files with 352 additions and 321 deletions

View File

@ -104,9 +104,8 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* Remove the lock with the given lock id. * Remove the lock with the given lock id.
* *
* @param lockId lock id * @param lockId lock id
* @return true if the lock was removed, false if the given lock id did not exist
*/ */
public boolean removeLock(long lockId); public void removeLock(long lockId);
/** /**
* Add a log to the entry with the given id. * Add a log to the entry with the given id.

View File

@ -97,11 +97,10 @@ public class MySQLConnector extends SQLMetadataConnector
} }
@Override @Override
protected boolean isTransientException(Throwable e) protected boolean connectorIsTransientException(Throwable e)
{ {
return e instanceof MySQLTransientException return e instanceof MySQLTransientException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317) || (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317 /* ER_QUERY_INTERRUPTED */);
;
} }
@Override @Override

View File

@ -118,7 +118,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
public DBI getDBI() { return dbi; } public DBI getDBI() { return dbi; }
@Override @Override
protected boolean isTransientException(Throwable e) protected boolean connectorIsTransientException(Throwable e)
{ {
if(e instanceof SQLException) { if(e instanceof SQLException) {
final String sqlState = ((SQLException) e).getSQLState(); final String sqlState = ((SQLException) e).getSQLState();

View File

@ -17,9 +17,12 @@
package io.druid.metadata; package io.druid.metadata;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.Batch;
@ -28,11 +31,17 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.IntegerMapper; import org.skife.jdbi.v2.util.IntegerMapper;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
public abstract class SQLMetadataConnector implements MetadataStorageConnector public abstract class SQLMetadataConnector implements MetadataStorageConnector
{ {
@ -41,32 +50,43 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
private final Supplier<MetadataStorageConnectorConfig> config; private final Supplier<MetadataStorageConnectorConfig> config;
private final Supplier<MetadataStorageTablesConfig> tablesConfigSupplier; private final Supplier<MetadataStorageTablesConfig> tablesConfigSupplier;
private final Predicate<Throwable> shouldRetry;
public SQLMetadataConnector(Supplier<MetadataStorageConnectorConfig> config, public SQLMetadataConnector(
Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> tablesConfigSupplier Supplier<MetadataStorageTablesConfig> tablesConfigSupplier
) )
{ {
this.config = config; this.config = config;
this.tablesConfigSupplier = tablesConfigSupplier; this.tablesConfigSupplier = tablesConfigSupplier;
this.shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return isTransientException(e);
}
};
} }
/** /**
* SQL type to use for payload data (e.g. JSON blobs). * SQL type to use for payload data (e.g. JSON blobs).
* Must be a binary type, which values can be accessed using ResultSet.getBytes() * Must be a binary type, which values can be accessed using ResultSet.getBytes()
* * <p/>
* The resulting string will be interpolated into the table creation statement, e.g. * The resulting string will be interpolated into the table creation statement, e.g.
* <code>CREATE TABLE druid_table ( payload <type> NOT NULL, ... )</code> * <code>CREATE TABLE druid_table ( payload <type> NOT NULL, ... )</code>
* *
* @return String representing the SQL type * @return String representing the SQL type
*/ */
protected String getPayloadType() { protected String getPayloadType()
{
return PAYLOAD_TYPE; return PAYLOAD_TYPE;
} }
/** /**
* Auto-incrementing SQL type to use for IDs * Auto-incrementing SQL type to use for IDs
* Must be an integer type, which values will be automatically set by the database * Must be an integer type, which values will be automatically set by the database
* * <p/>
* The resulting string will be interpolated into the table creation statement, e.g. * The resulting string will be interpolated into the table creation statement, e.g.
* <code>CREATE TABLE druid_table ( id <type> NOT NULL, ... )</code> * <code>CREATE TABLE druid_table ( id <type> NOT NULL, ... )</code>
* *
@ -78,7 +98,56 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public abstract boolean tableExists(Handle handle, final String tableName); public abstract boolean tableExists(Handle handle, final String tableName);
protected boolean isTransientException(Throwable e) { public <T> T retryWithHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return getDBI().withHandle(callback);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public <T> T retryTransaction(final TransactionCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return getDBI().inTransaction(callback);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public final boolean isTransientException(Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| connectorIsTransientException(e)
|| (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause())));
}
protected boolean connectorIsTransientException(Throwable e)
{
return false; return false;
} }

View File

@ -20,12 +20,10 @@ package io.druid.metadata;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.RetryUtils;
import com.metamx.common.StringUtils; import com.metamx.common.StringUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -34,9 +32,7 @@ import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException; import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
@ -44,11 +40,8 @@ import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException; import java.io.IOException;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -99,7 +92,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
) throws EntryExistsException ) throws EntryExistsException
{ {
try { try {
retryingHandle( connector.retryWithHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
{ {
@Override @Override
@ -122,7 +115,8 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
} }
} }
); );
} catch(Exception e) { }
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException || final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException (e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException); && e.getCause() instanceof StatementException);
@ -136,7 +130,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public boolean setStatus(final String entryId, final boolean active, final StatusType status) public boolean setStatus(final String entryId, final boolean active, final StatusType status)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Boolean>() new HandleCallback<Boolean>()
{ {
@Override @Override
@ -159,7 +153,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public Optional<EntryType> getEntry(final String entryId) public Optional<EntryType> getEntry(final String entryId)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Optional<EntryType>>() new HandleCallback<Optional<EntryType>>()
{ {
@Override @Override
@ -183,7 +177,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public Optional<StatusType> getStatus(final String entryId) public Optional<StatusType> getStatus(final String entryId)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Optional<StatusType>>() new HandleCallback<Optional<StatusType>>()
{ {
@Override @Override
@ -206,7 +200,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus() public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus()
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<List<Pair<EntryType, StatusType>>>() new HandleCallback<List<Pair<EntryType, StatusType>>>()
{ {
@Override @Override
@ -253,7 +247,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public List<StatusType> getInactiveStatusesSince(final DateTime timestamp) public List<StatusType> getInactiveStatusesSince(final DateTime timestamp)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<List<StatusType>>() new HandleCallback<List<StatusType>>()
{ {
@Override @Override
@ -294,7 +288,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public boolean addLock(final String entryId, final LockType lock) public boolean addLock(final String entryId, final LockType lock)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Boolean>() new HandleCallback<Boolean>()
{ {
@Override @Override
@ -314,22 +308,19 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
); );
} }
public boolean removeLock(final long lockId) public void removeLock(final long lockId)
{ {
return retryingHandle( connector.retryWithHandle(
new HandleCallback<Boolean>() new HandleCallback<Void>()
{ {
@Override @Override
public Boolean withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
return handle.createStatement( handle.createStatement(String.format("DELETE FROM %s WHERE id = :id", lockTable))
String.format(
"DELETE FROM %s WHERE id = :id",
lockTable
)
)
.bind("id", lockId) .bind("id", lockId)
.execute() == 1; .execute();
return null;
} }
} }
); );
@ -337,7 +328,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public boolean addLog(final String entryId, final LogType log) public boolean addLog(final String entryId, final LogType log)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Boolean>() new HandleCallback<Boolean>()
{ {
@Override @Override
@ -359,7 +350,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public List<LogType> getLogs(final String entryId) public List<LogType> getLogs(final String entryId)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<List<LogType>>() new HandleCallback<List<LogType>>()
{ {
@Override @Override
@ -408,7 +399,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
public Map<Long, LockType> getLocks(final String entryId) public Map<Long, LockType> getLocks(final String entryId)
{ {
return retryingHandle( return connector.retryWithHandle(
new HandleCallback<Map<Long, LockType>>() new HandleCallback<Map<Long, LockType>>()
{ {
@Override @Override
@ -470,42 +461,4 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
} }
); );
} }
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return connector.getDBI().withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
protected boolean shouldRetryException(final Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| connector.isTransientException(e)
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
}
} }

View File

@ -43,7 +43,8 @@ public class SQLMetadataStorageActionHandlerTest
private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>> handler; private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>> handler;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception
{
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
connector = new TestDerbyConnector( connector = new TestDerbyConnector(
@ -70,35 +71,45 @@ public class SQLMetadataStorageActionHandlerTest
@Override @Override
public TypeReference<Map<String, Integer>> getEntryType() public TypeReference<Map<String, Integer>> getEntryType()
{ {
return new TypeReference<Map<String, Integer>>() {}; return new TypeReference<Map<String, Integer>>()
{
};
} }
@Override @Override
public TypeReference<Map<String, Integer>> getStatusType() public TypeReference<Map<String, Integer>> getStatusType()
{ {
return new TypeReference<Map<String, Integer>>() {}; return new TypeReference<Map<String, Integer>>()
{
};
} }
@Override @Override
public TypeReference<Map<String, String>> getLogType() public TypeReference<Map<String, String>> getLogType()
{ {
return new TypeReference<Map<String, String>>() {}; return new TypeReference<Map<String, String>>()
{
};
} }
@Override @Override
public TypeReference<Map<String, Integer>> getLockType() public TypeReference<Map<String, Integer>> getLockType()
{ {
return new TypeReference<Map<String, Integer>>() {}; return new TypeReference<Map<String, Integer>>()
{
};
} }
}, },
entryType, entryType,
entryTable, entryTable,
logTable, logTable,
lockTable); lockTable
);
} }
@After @After
public void tearDown() { public void tearDown()
{
connector.tearDown(); connector.tearDown();
} }
@ -240,7 +251,7 @@ public class SQLMetadataStorageActionHandlerTest
); );
long lockId = locks.keySet().iterator().next(); long lockId = locks.keySet().iterator().next();
Assert.assertTrue(handler.removeLock(lockId)); handler.removeLock(lockId);
locks.remove(lockId); locks.remove(lockId);
final Map<Long, Map<String, Integer>> updated = handler.getLocks(entryId); final Map<Long, Map<String, Integer>> updated = handler.getLocks(entryId);