diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java index 8006c3be662..ebf6dcdb3ef 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java @@ -104,9 +104,8 @@ public interface MetadataStorageActionHandler config; private final Supplier tablesConfigSupplier; + private final Predicate shouldRetry; - public SQLMetadataConnector(Supplier config, - Supplier tablesConfigSupplier + public SQLMetadataConnector( + Supplier config, + Supplier tablesConfigSupplier ) { this.config = config; this.tablesConfigSupplier = tablesConfigSupplier; + this.shouldRetry = new Predicate() + { + @Override + public boolean apply(Throwable e) + { + return isTransientException(e); + } + }; } /** * SQL type to use for payload data (e.g. JSON blobs). * Must be a binary type, which values can be accessed using ResultSet.getBytes() - * + *

* The resulting string will be interpolated into the table creation statement, e.g. * CREATE TABLE druid_table ( payload NOT NULL, ... ) * * @return String representing the SQL type */ - protected String getPayloadType() { + protected String getPayloadType() + { return PAYLOAD_TYPE; } /** * Auto-incrementing SQL type to use for IDs * Must be an integer type, which values will be automatically set by the database - * + *

* The resulting string will be interpolated into the table creation statement, e.g. * CREATE TABLE druid_table ( id NOT NULL, ... ) * @@ -78,7 +98,56 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector public abstract boolean tableExists(Handle handle, final String tableName); - protected boolean isTransientException(Throwable e) { + public T retryWithHandle(final HandleCallback callback) + { + final Callable call = new Callable() + { + @Override + public T call() throws Exception + { + return getDBI().withHandle(callback); + } + }; + final int maxTries = 10; + try { + return RetryUtils.retry(call, shouldRetry, maxTries); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public T retryTransaction(final TransactionCallback callback) + { + final Callable call = new Callable() + { + @Override + public T call() throws Exception + { + return getDBI().inTransaction(callback); + } + }; + final int maxTries = 10; + try { + return RetryUtils.retry(call, shouldRetry, maxTries); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public final boolean isTransientException(Throwable e) + { + return e != null && (e instanceof SQLTransientException + || e instanceof SQLRecoverableException + || e instanceof UnableToObtainConnectionException + || connectorIsTransientException(e) + || (e instanceof SQLException && isTransientException(e.getCause())) + || (e instanceof DBIException && isTransientException(e.getCause()))); + } + + protected boolean connectorIsTransientException(Throwable e) + { return false; } @@ -94,7 +163,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector if (!tableExists(handle, tableName)) { log.info("Creating table[%s]", tableName); final Batch batch = handle.createBatch(); - for(String s : sql) { + for (String s : sql) { batch.add(s); } batch.execute(); diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java index 271495790c6..5cbcca5c89f 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java @@ -20,12 +20,10 @@ package io.druid.metadata; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.metamx.common.Pair; -import com.metamx.common.RetryUtils; import com.metamx.common.StringUtils; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; @@ -34,9 +32,7 @@ import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.exceptions.StatementException; -import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -44,11 +40,8 @@ import org.skife.jdbi.v2.util.ByteArrayMapper; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLRecoverableException; -import java.sql.SQLTransientException; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; public class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler @@ -99,7 +92,7 @@ public class SQLMetadataStorageActionHandler() { @Override @@ -122,7 +115,8 @@ public class SQLMetadataStorageActionHandler() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - return handle.createStatement( - String.format( - "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = TRUE", - entryTable - ) - ) - .bind("id", entryId) - .bind("active", active) - .bind("status_payload", jsonMapper.writeValueAsBytes(status)) - .execute() == 1; - } - } - ); + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = TRUE", + entryTable + ) + ) + .bind("id", entryId) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute() == 1; + } + } + ); } public Optional getEntry(final String entryId) { - return retryingHandle( + return connector.retryWithHandle( new HandleCallback>() { @Override @@ -183,7 +177,7 @@ public class SQLMetadataStorageActionHandler getStatus(final String entryId) { - return retryingHandle( + return connector.retryWithHandle( new HandleCallback>() { @Override @@ -206,306 +200,265 @@ public class SQLMetadataStorageActionHandler> getActiveEntriesWithStatus() { - return retryingHandle( - new HandleCallback>>() - { - @Override - public List> withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback>>() { - return handle - .createQuery( - String.format( - "SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date", - entryTable - ) - ) - .map( - new ResultSetMapper>() - { - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) - throws SQLException + @Override + public List> withHandle(Handle handle) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date", + entryTable + ) + ) + .map( + new ResultSetMapper>() { - try { - return Pair.of( - jsonMapper.readValue( - r.getBytes("payload"), - entryType - ), - jsonMapper.readValue( - r.getBytes("status_payload"), - statusType - ) - ); - } - catch (IOException e) { - log.makeAlert(e, "Failed to parse entry payload").addData("entry", r.getString("id")).emit(); - throw new SQLException(e); + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + jsonMapper.readValue( + r.getBytes("payload"), + entryType + ), + jsonMapper.readValue( + r.getBytes("status_payload"), + statusType + ) + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to parse entry payload").addData("entry", r.getString("id")).emit(); + throw new SQLException(e); + } } } - } - ).list(); + ).list(); + } } - } ); } public List getInactiveStatusesSince(final DateTime timestamp) { - return retryingHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback>() { - return handle - .createQuery( - String.format( - "SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC", - entryTable - ) - ).bind("start", timestamp.toString()) - .map( - new ResultSetMapper() - { - @Override - public StatusType map(int index, ResultSet r, StatementContext ctx) throws SQLException + @Override + public List withHandle(Handle handle) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC", + entryTable + ) + ).bind("start", timestamp.toString()) + .map( + new ResultSetMapper() { - try { - return jsonMapper.readValue( - r.getBytes("status_payload"), - statusType - ); - } - catch (IOException e) { - log.makeAlert(e, "Failed to parse status payload") - .addData("entry", r.getString("id")) - .emit(); - throw new SQLException(e); + @Override + public StatusType map(int index, ResultSet r, StatementContext ctx) throws SQLException + { + try { + return jsonMapper.readValue( + r.getBytes("status_payload"), + statusType + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to parse status payload") + .addData("entry", r.getString("id")) + .emit(); + throw new SQLException(e); + } } } - } - ).list(); + ).list(); + } } - } ); } public boolean addLock(final String entryId, final LockType lock) { - return retryingHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback() { - return handle.createStatement( - String.format( - "INSERT INTO %1$s (%2$s_id, lock_payload) VALUES (:entryId, :payload)", - lockTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .bind("payload", jsonMapper.writeValueAsBytes(lock)) - .execute() == 1; + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "INSERT INTO %1$s (%2$s_id, lock_payload) VALUES (:entryId, :payload)", + lockTable, entryTypeName + ) + ) + .bind("entryId", entryId) + .bind("payload", jsonMapper.writeValueAsBytes(lock)) + .execute() == 1; + } } - } ); } - public boolean removeLock(final long lockId) + public void removeLock(final long lockId) { - return retryingHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception + connector.retryWithHandle( + new HandleCallback() { - return handle.createStatement( - String.format( - "DELETE FROM %s WHERE id = :id", - lockTable - ) - ) - .bind("id", lockId) - .execute() == 1; + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement(String.format("DELETE FROM %s WHERE id = :id", lockTable)) + .bind("id", lockId) + .execute(); + + return null; + } } - } ); } public boolean addLog(final String entryId, final LogType log) { - return retryingHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback() { - return handle.createStatement( - String.format( - "INSERT INTO %1$s (%2$s_id, log_payload) VALUES (:entryId, :payload)", - logTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .bind("payload", jsonMapper.writeValueAsBytes(log)) - .execute() == 1; + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format( + "INSERT INTO %1$s (%2$s_id, log_payload) VALUES (:entryId, :payload)", + logTable, entryTypeName + ) + ) + .bind("entryId", entryId) + .bind("payload", jsonMapper.writeValueAsBytes(log)) + .execute() == 1; + } } - } ); } public List getLogs(final String entryId) { - return retryingHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback>() { - return handle - .createQuery( - String.format( - "SELECT log_payload FROM %1$s WHERE %2$s_id = :entryId", - logTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .map(ByteArrayMapper.FIRST) - .fold( - Lists.newLinkedList(), - new Folder3, byte[]>() - { - @Override - public List fold( - List list, byte[] bytes, FoldController control, StatementContext ctx - ) throws SQLException + @Override + public List withHandle(Handle handle) throws Exception + { + return handle + .createQuery( + String.format( + "SELECT log_payload FROM %1$s WHERE %2$s_id = :entryId", + logTable, entryTypeName + ) + ) + .bind("entryId", entryId) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newLinkedList(), + new Folder3, byte[]>() { - try { - list.add( - jsonMapper.readValue( - bytes, logType - ) - ); - return list; - } - catch (IOException e) { - log.makeAlert(e, "Failed to deserialize log") - .addData("entryId", entryId) - .addData("payload", StringUtils.fromUtf8(bytes)) - .emit(); - throw new SQLException(e); + @Override + public List fold( + List list, byte[] bytes, FoldController control, StatementContext ctx + ) throws SQLException + { + try { + list.add( + jsonMapper.readValue( + bytes, logType + ) + ); + return list; + } + catch (IOException e) { + log.makeAlert(e, "Failed to deserialize log") + .addData("entryId", entryId) + .addData("payload", StringUtils.fromUtf8(bytes)) + .emit(); + throw new SQLException(e); + } } } - } - ); + ); + } } - } ); } public Map getLocks(final String entryId) { - return retryingHandle( - new HandleCallback>() - { - @Override - public Map withHandle(Handle handle) throws Exception + return connector.retryWithHandle( + new HandleCallback>() { - return handle.createQuery( - String.format( - "SELECT id, lock_payload FROM %1$s WHERE %2$s_id = :entryId", - lockTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .map( - new ResultSetMapper>() - { - @Override - public Pair map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - try { - return Pair.of( - r.getLong("id"), - jsonMapper.readValue( - r.getBytes("lock_payload"), - lockType - ) - ); - } - catch (IOException e) { - log.makeAlert(e, "Failed to deserialize " + lockType.getType()) - .addData("id", r.getLong("id")) - .addData( - "lockPayload", StringUtils.fromUtf8(r.getBytes("lock_payload")) - ) - .emit(); - throw new SQLException(e); - } - } - } - ) - .fold( - Maps.newLinkedHashMap(), - new Folder3, Pair>() - { - @Override - public Map fold( - Map accumulator, - Pair lock, - FoldController control, - StatementContext ctx - ) throws SQLException - { - accumulator.put(lock.lhs, lock.rhs); - return accumulator; - } - } - ); + @Override + public Map withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT id, lock_payload FROM %1$s WHERE %2$s_id = :entryId", + lockTable, entryTypeName + ) + ) + .bind("entryId", entryId) + .map( + new ResultSetMapper>() + { + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + r.getLong("id"), + jsonMapper.readValue( + r.getBytes("lock_payload"), + lockType + ) + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to deserialize " + lockType.getType()) + .addData("id", r.getLong("id")) + .addData( + "lockPayload", StringUtils.fromUtf8(r.getBytes("lock_payload")) + ) + .emit(); + throw new SQLException(e); + } + } + } + ) + .fold( + Maps.newLinkedHashMap(), + new Folder3, Pair>() + { + @Override + public Map fold( + Map accumulator, + Pair lock, + FoldController control, + StatementContext ctx + ) throws SQLException + { + accumulator.put(lock.lhs, lock.rhs); + return accumulator; + } + } + ); + } } - } ); } - - private T retryingHandle(final HandleCallback callback) - { - final Callable call = new Callable() - { - @Override - public T call() throws Exception - { - return connector.getDBI().withHandle(callback); - } - }; - final Predicate shouldRetry = new Predicate() - { - @Override - public boolean apply(Throwable e) - { - return shouldRetryException(e); - } - }; - final int maxTries = 10; - try { - return RetryUtils.retry(call, shouldRetry, maxTries); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - protected boolean shouldRetryException(final Throwable e) - { - return e != null && (e instanceof SQLTransientException - || connector.isTransientException(e) - || e instanceof SQLRecoverableException - || e instanceof UnableToObtainConnectionException - || (e instanceof SQLException && shouldRetryException(e.getCause())) - || (e instanceof DBIException && shouldRetryException(e.getCause()))); - } - } diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java index e9f07881849..0d2514fda4f 100644 --- a/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/io/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -40,11 +40,12 @@ public class SQLMetadataStorageActionHandlerTest private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private TestDerbyConnector connector; private MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test"); - private SQLMetadataStorageActionHandler,Map,Map,Map> handler; + private SQLMetadataStorageActionHandler, Map, Map, Map> handler; @Before - public void setUp() throws Exception { - MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); + public void setUp() throws Exception + { + MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); connector = new TestDerbyConnector( Suppliers.ofInstance(config), @@ -67,38 +68,48 @@ public class SQLMetadataStorageActionHandlerTest jsonMapper, new MetadataStorageActionHandlerTypes, Map, Map, Map>() { - @Override - public TypeReference> getEntryType() - { - return new TypeReference>() {}; - } + @Override + public TypeReference> getEntryType() + { + return new TypeReference>() + { + }; + } - @Override - public TypeReference> getStatusType() - { - return new TypeReference>() {}; - } + @Override + public TypeReference> getStatusType() + { + return new TypeReference>() + { + }; + } - @Override - public TypeReference> getLogType() - { - return new TypeReference>() {}; - } + @Override + public TypeReference> getLogType() + { + return new TypeReference>() + { + }; + } - @Override - public TypeReference> getLockType() - { - return new TypeReference>() {}; - } - }, + @Override + public TypeReference> getLockType() + { + return new TypeReference>() + { + }; + } + }, entryType, entryTable, logTable, - lockTable); + lockTable + ); } @After - public void tearDown() { + public void tearDown() + { connector.tearDown(); } @@ -240,7 +251,7 @@ public class SQLMetadataStorageActionHandlerTest ); long lockId = locks.keySet().iterator().next(); - Assert.assertTrue(handler.removeLock(lockId)); + handler.removeLock(lockId); locks.remove(lockId); final Map> updated = handler.getLocks(entryId);