From f005da6dfdf4d51b244e360abdf1906f7d45670d Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 6 Dec 2017 11:28:59 +0100 Subject: [PATCH] ARTEMIS-1541 Make the JDBC Node Manager more resilient on failures (cherry picked from commit 70b21725edae28b591b87bb4de0f51364e9cfd50) --- .../jdbc/store/sql/GenericSQLProvider.java | 9 ++ .../artemis/jdbc/store/sql/SQLProvider.java | 2 + .../impl/jdbc/ActiveMQScheduledLeaseLock.java | 9 +- .../server/impl/jdbc/JdbcNodeManager.java | 26 ++++-- .../impl/jdbc/JdbcSharedStateManager.java | 87 +++++++++++++++---- .../server/impl/jdbc/SharedStateManager.java | 1 + 6 files changed, 104 insertions(+), 30 deletions(-) diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java index ac793d34e2..c15ce18a83 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java @@ -99,6 +99,8 @@ public class GenericSQLProvider implements SQLProvider { private final String writeNodeIdSQL; + private final String initializeNodeIdSQL; + private final String readNodeIdSQL; protected final DatabaseStoreType databaseStoreType; @@ -176,6 +178,8 @@ public class GenericSQLProvider implements SQLProvider { writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID; + initializeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE NODE_ID IS NULL AND ID = " + NODE_ID_ROW_ID; + readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID; } @@ -367,6 +371,11 @@ public class GenericSQLProvider implements SQLProvider { return readNodeIdSQL; } + @Override + public String initializeNodeIdSQL() { + return initializeNodeIdSQL; + } + @Override public boolean closeConnectionOnShutdown() { return true; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java index b4b55d5c18..66af24be57 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java @@ -96,6 +96,8 @@ public interface SQLProvider { String writeNodeIdSQL(); + String initializeNodeIdSQL(); + String readNodeIdSQL(); interface Factory { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java index 4a281a213b..c5cda70ada 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java @@ -87,8 +87,13 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem public void run() { final long lastRenewStart = this.lastLockRenewStart; final long renewStart = System.nanoTime(); - if (!this.lock.renew()) { - ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null); + try { + if (!this.lock.renew()) { + ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null); + } + } catch (Throwable t) { + ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null); + throw t; } //logic to detect slowness of DB and/or the scheduled executor service detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index 8cb852d971..7bda51e196 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -111,16 +111,24 @@ public final class JdbcNodeManager extends NodeManager { } @Override - public synchronized void start() throws Exception { - if (isStarted()) { - return; + public void start() throws Exception { + try { + synchronized (this) { + if (isStarted()) { + return; + } + if (!replicatedBackup) { + final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); + setUUID(nodeId); + } + super.start(); + } + } catch (IllegalStateException e) { + if (this.ioCriticalErrorListener != null) { + this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null); + } + throw e; } - if (!replicatedBackup) { - final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); - setUUID(nodeId); - } - - super.start(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java index dad1abc61a..f1e0554b70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java @@ -27,6 +27,7 @@ import java.util.function.Supplier; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.UUID; +import org.jboss.logging.Logger; /** * JDBC implementation of a {@link SharedStateManager}. @@ -34,12 +35,15 @@ import org.apache.activemq.artemis.utils.UUID; @SuppressWarnings("SynchronizeOnNonFinalField") final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager { + private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class); + public static final int MAX_SETUP_ATTEMPTS = 20; private final String holderId; private final long lockExpirationMillis; private JdbcLeaseLock liveLock; private JdbcLeaseLock backupLock; private PreparedStatement readNodeId; private PreparedStatement writeNodeId; + private PreparedStatement initializeNodeId; private PreparedStatement readState; private PreparedStatement writeState; @@ -81,6 +85,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL()); } catch (SQLException e) { //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized + if (logger.isDebugEnabled()) { + logger.debug("Error while creating the schema of the JDBC shared state manager", e); + } } } @@ -106,6 +113,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0); this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); + this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL()); this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL()); this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); } @@ -176,34 +184,74 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS } } + private boolean rawInitializeNodeId(UUID nodeId) throws SQLException { + final PreparedStatement preparedStatement = this.initializeNodeId; + preparedStatement.setString(1, nodeId.toString()); + final int rows = preparedStatement.executeUpdate(); + assert rows <= 1; + return rows > 0; + } + @Override public UUID setup(Supplier nodeIdFactory) { - //uses a single transaction to make everything + SQLException lastError = null; synchronized (connection) { - try { - final UUID nodeId; - connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); - connection.setAutoCommit(false); + final UUID newNodeId = nodeIdFactory.get(); + for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) { + lastError = null; try { - UUID readNodeId = rawReadNodeId(); - if (readNodeId == null) { - nodeId = nodeIdFactory.get(); - rawWriteNodeId(nodeId); - } else { - nodeId = readNodeId; + final UUID nodeId = initializeOrReadNodeId(newNodeId); + if (nodeId != null) { + return nodeId; } } catch (SQLException e) { - connection.rollback(); - connection.setAutoCommit(true); - throw e; + if (logger.isDebugEnabled()) { + logger.debug("Error while attempting to setup the NodeId", e); + } + lastError = e; } - connection.commit(); - connection.setAutoCommit(true); - return nodeId; - } catch (SQLException e) { - throw new IllegalStateException(e); } } + if (lastError != null) { + logger.error("Unable to setup a NodeId on the JDBC shared state", lastError); + } else { + logger.error("Unable to setup a NodeId on the JDBC shared state"); + } + throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId"); + } + + private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException { + final UUID nodeId; + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + connection.setAutoCommit(false); + try { + //optimistic try to initialize nodeId + if (rawInitializeNodeId(newNodeId)) { + nodeId = newNodeId; + } else { + nodeId = rawReadNodeId(); + } + } catch (SQLException e) { + connection.rollback(); + connection.setAutoCommit(true); + if (logger.isDebugEnabled()) { + logger.debug("Rollback while trying to update NodeId to " + newNodeId, e); + } + return null; + } + if (nodeId != null) { + connection.commit(); + connection.setAutoCommit(true); + return nodeId; + } else { + //that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet + connection.rollback(); + connection.setAutoCommit(true); + if (logger.isDebugEnabled()) { + logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId"); + } + return null; + } } private static State decodeState(String s) { @@ -286,6 +334,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS synchronized (connection) { this.readNodeId.close(); this.writeNodeId.close(); + this.initializeNodeId.close(); this.readState.close(); this.writeState.close(); this.liveLock.close(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java index e26879c6cc..0b2d5fbd8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java @@ -46,6 +46,7 @@ interface SharedStateManager extends AutoCloseable { * * @param nodeIdFactory used to create the nodeId if needed * @return the newly created NodeId or the old one if already present + * @throws IllegalStateException if not able to setup the NodeId properly */ UUID setup(Supplier nodeIdFactory);