ARTEMIS-1541 Make the JDBC Node Manager more resilient on failures

In order to make the JDBC Node Manager more resilient has been implemented:
- recovering with fixed number of retries during the NodeId setup + unrecoverable failure otherwise
- unrecoverable fail on exceptions while renewing a lease lock

In addition, in different parts of these critical processes are added more log informations to help diagnose.
This commit is contained in:
Francesco Nigro 2017-12-06 11:28:59 +01:00 committed by Clebert Suconic
parent d609e36f68
commit 170e89f9fc
6 changed files with 104 additions and 30 deletions

View File

@ -99,6 +99,8 @@ public class GenericSQLProvider implements SQLProvider {
private final String writeNodeIdSQL; private final String writeNodeIdSQL;
private final String initializeNodeIdSQL;
private final String readNodeIdSQL; private final String readNodeIdSQL;
protected final DatabaseStoreType databaseStoreType; protected final DatabaseStoreType databaseStoreType;
@ -176,6 +178,8 @@ public class GenericSQLProvider implements SQLProvider {
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID; writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
initializeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE NODE_ID IS NULL AND ID = " + NODE_ID_ROW_ID;
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID; readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
} }
@ -367,6 +371,11 @@ public class GenericSQLProvider implements SQLProvider {
return readNodeIdSQL; return readNodeIdSQL;
} }
@Override
public String initializeNodeIdSQL() {
return initializeNodeIdSQL;
}
@Override @Override
public boolean closeConnectionOnShutdown() { public boolean closeConnectionOnShutdown() {
return true; return true;

View File

@ -96,6 +96,8 @@ public interface SQLProvider {
String writeNodeIdSQL(); String writeNodeIdSQL();
String initializeNodeIdSQL();
String readNodeIdSQL(); String readNodeIdSQL();
interface Factory { interface Factory {

View File

@ -87,8 +87,13 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
public void run() { public void run() {
final long lastRenewStart = this.lastLockRenewStart; final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime(); final long renewStart = System.nanoTime();
if (!this.lock.renew()) { try {
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null); if (!this.lock.renew()) {
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
}
} catch (Throwable t) {
ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null);
throw t;
} }
//logic to detect slowness of DB and/or the scheduled executor service //logic to detect slowness of DB and/or the scheduled executor service
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis()); detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());

View File

@ -111,16 +111,24 @@ public final class JdbcNodeManager extends NodeManager {
} }
@Override @Override
public synchronized void start() throws Exception { public void start() throws Exception {
if (isStarted()) { try {
return; synchronized (this) {
if (isStarted()) {
return;
}
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
super.start();
}
} catch (IllegalStateException e) {
if (this.ioCriticalErrorListener != null) {
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
}
throw e;
} }
if (!replicatedBackup) {
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
}
super.start();
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
/** /**
* JDBC implementation of a {@link SharedStateManager}. * JDBC implementation of a {@link SharedStateManager}.
@ -34,12 +35,15 @@ import org.apache.activemq.artemis.utils.UUID;
@SuppressWarnings("SynchronizeOnNonFinalField") @SuppressWarnings("SynchronizeOnNonFinalField")
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager { final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class);
public static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId; private final String holderId;
private final long lockExpirationMillis; private final long lockExpirationMillis;
private JdbcLeaseLock liveLock; private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock; private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId; private PreparedStatement readNodeId;
private PreparedStatement writeNodeId; private PreparedStatement writeNodeId;
private PreparedStatement initializeNodeId;
private PreparedStatement readState; private PreparedStatement readState;
private PreparedStatement writeState; private PreparedStatement writeState;
@ -81,6 +85,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL()); createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
} catch (SQLException e) { } catch (SQLException e) {
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the schema of the JDBC shared state manager", e);
}
} }
} }
@ -106,6 +113,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0); this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL()); this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
} }
@ -176,34 +184,74 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
} }
} }
private boolean rawInitializeNodeId(UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.initializeNodeId;
preparedStatement.setString(1, nodeId.toString());
final int rows = preparedStatement.executeUpdate();
assert rows <= 1;
return rows > 0;
}
@Override @Override
public UUID setup(Supplier<? extends UUID> nodeIdFactory) { public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
//uses a single transaction to make everything SQLException lastError = null;
synchronized (connection) { synchronized (connection) {
try { final UUID newNodeId = nodeIdFactory.get();
final UUID nodeId; for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); lastError = null;
connection.setAutoCommit(false);
try { try {
UUID readNodeId = rawReadNodeId(); final UUID nodeId = initializeOrReadNodeId(newNodeId);
if (readNodeId == null) { if (nodeId != null) {
nodeId = nodeIdFactory.get(); return nodeId;
rawWriteNodeId(nodeId);
} else {
nodeId = readNodeId;
} }
} catch (SQLException e) { } catch (SQLException e) {
connection.rollback(); if (logger.isDebugEnabled()) {
connection.setAutoCommit(true); logger.debug("Error while attempting to setup the NodeId", e);
throw e; }
lastError = e;
} }
connection.commit();
connection.setAutoCommit(true);
return nodeId;
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
if (lastError != null) {
logger.error("Unable to setup a NodeId on the JDBC shared state", lastError);
} else {
logger.error("Unable to setup a NodeId on the JDBC shared state");
}
throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId");
}
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
final UUID nodeId;
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
connection.setAutoCommit(false);
try {
//optimistic try to initialize nodeId
if (rawInitializeNodeId(newNodeId)) {
nodeId = newNodeId;
} else {
nodeId = rawReadNodeId();
}
} catch (SQLException e) {
connection.rollback();
connection.setAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Rollback while trying to update NodeId to " + newNodeId, e);
}
return null;
}
if (nodeId != null) {
connection.commit();
connection.setAutoCommit(true);
return nodeId;
} else {
//that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet
connection.rollback();
connection.setAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId");
}
return null;
}
} }
private static State decodeState(String s) { private static State decodeState(String s) {
@ -286,6 +334,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) { synchronized (connection) {
this.readNodeId.close(); this.readNodeId.close();
this.writeNodeId.close(); this.writeNodeId.close();
this.initializeNodeId.close();
this.readState.close(); this.readState.close();
this.writeState.close(); this.writeState.close();
this.liveLock.close(); this.liveLock.close();

View File

@ -46,6 +46,7 @@ interface SharedStateManager extends AutoCloseable {
* *
* @param nodeIdFactory used to create the nodeId if needed * @param nodeIdFactory used to create the nodeId if needed
* @return the newly created NodeId or the old one if already present * @return the newly created NodeId or the old one if already present
* @throws IllegalStateException if not able to setup the NodeId properly
*/ */
UUID setup(Supplier<? extends UUID> nodeIdFactory); UUID setup(Supplier<? extends UUID> nodeIdFactory);