ARTEMIS-1541 Make the JDBC Node Manager more resilient on failures
(cherry picked from commit 70b21725ed
)
This commit is contained in:
parent
a0f3da5d72
commit
f005da6dfd
|
@ -99,6 +99,8 @@ public class GenericSQLProvider implements SQLProvider {
|
||||||
|
|
||||||
private final String writeNodeIdSQL;
|
private final String writeNodeIdSQL;
|
||||||
|
|
||||||
|
private final String initializeNodeIdSQL;
|
||||||
|
|
||||||
private final String readNodeIdSQL;
|
private final String readNodeIdSQL;
|
||||||
|
|
||||||
protected final DatabaseStoreType databaseStoreType;
|
protected final DatabaseStoreType databaseStoreType;
|
||||||
|
@ -176,6 +178,8 @@ public class GenericSQLProvider implements SQLProvider {
|
||||||
|
|
||||||
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
|
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
|
||||||
|
|
||||||
|
initializeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE NODE_ID IS NULL AND ID = " + NODE_ID_ROW_ID;
|
||||||
|
|
||||||
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
|
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -367,6 +371,11 @@ public class GenericSQLProvider implements SQLProvider {
|
||||||
return readNodeIdSQL;
|
return readNodeIdSQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String initializeNodeIdSQL() {
|
||||||
|
return initializeNodeIdSQL;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean closeConnectionOnShutdown() {
|
public boolean closeConnectionOnShutdown() {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -96,6 +96,8 @@ public interface SQLProvider {
|
||||||
|
|
||||||
String writeNodeIdSQL();
|
String writeNodeIdSQL();
|
||||||
|
|
||||||
|
String initializeNodeIdSQL();
|
||||||
|
|
||||||
String readNodeIdSQL();
|
String readNodeIdSQL();
|
||||||
|
|
||||||
interface Factory {
|
interface Factory {
|
||||||
|
|
|
@ -87,9 +87,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
|
||||||
public void run() {
|
public void run() {
|
||||||
final long lastRenewStart = this.lastLockRenewStart;
|
final long lastRenewStart = this.lastLockRenewStart;
|
||||||
final long renewStart = System.nanoTime();
|
final long renewStart = System.nanoTime();
|
||||||
|
try {
|
||||||
if (!this.lock.renew()) {
|
if (!this.lock.renew()) {
|
||||||
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
|
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
|
||||||
}
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
//logic to detect slowness of DB and/or the scheduled executor service
|
//logic to detect slowness of DB and/or the scheduled executor service
|
||||||
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
|
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
|
||||||
this.lastLockRenewStart = renewStart;
|
this.lastLockRenewStart = renewStart;
|
||||||
|
|
|
@ -111,7 +111,9 @@ public final class JdbcNodeManager extends NodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
try {
|
||||||
|
synchronized (this) {
|
||||||
if (isStarted()) {
|
if (isStarted()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -119,9 +121,15 @@ public final class JdbcNodeManager extends NodeManager {
|
||||||
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
|
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
|
||||||
setUUID(nodeId);
|
setUUID(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
} catch (IllegalStateException e) {
|
||||||
|
if (this.ioCriticalErrorListener != null) {
|
||||||
|
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stop() throws Exception {
|
public synchronized void stop() throws Exception {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.function.Supplier;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.UUID;
|
import org.apache.activemq.artemis.utils.UUID;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JDBC implementation of a {@link SharedStateManager}.
|
* JDBC implementation of a {@link SharedStateManager}.
|
||||||
|
@ -34,12 +35,15 @@ import org.apache.activemq.artemis.utils.UUID;
|
||||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||||
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
|
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class);
|
||||||
|
public static final int MAX_SETUP_ATTEMPTS = 20;
|
||||||
private final String holderId;
|
private final String holderId;
|
||||||
private final long lockExpirationMillis;
|
private final long lockExpirationMillis;
|
||||||
private JdbcLeaseLock liveLock;
|
private JdbcLeaseLock liveLock;
|
||||||
private JdbcLeaseLock backupLock;
|
private JdbcLeaseLock backupLock;
|
||||||
private PreparedStatement readNodeId;
|
private PreparedStatement readNodeId;
|
||||||
private PreparedStatement writeNodeId;
|
private PreparedStatement writeNodeId;
|
||||||
|
private PreparedStatement initializeNodeId;
|
||||||
private PreparedStatement readState;
|
private PreparedStatement readState;
|
||||||
private PreparedStatement writeState;
|
private PreparedStatement writeState;
|
||||||
|
|
||||||
|
@ -81,6 +85,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
||||||
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
|
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
|
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Error while creating the schema of the JDBC shared state manager", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,6 +113,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
||||||
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
|
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
|
||||||
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
|
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
|
||||||
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
|
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
|
||||||
|
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
|
||||||
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
|
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
|
||||||
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
|
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
|
||||||
}
|
}
|
||||||
|
@ -176,33 +184,73 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean rawInitializeNodeId(UUID nodeId) throws SQLException {
|
||||||
|
final PreparedStatement preparedStatement = this.initializeNodeId;
|
||||||
|
preparedStatement.setString(1, nodeId.toString());
|
||||||
|
final int rows = preparedStatement.executeUpdate();
|
||||||
|
assert rows <= 1;
|
||||||
|
return rows > 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
|
public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
|
||||||
//uses a single transaction to make everything
|
SQLException lastError = null;
|
||||||
synchronized (connection) {
|
synchronized (connection) {
|
||||||
|
final UUID newNodeId = nodeIdFactory.get();
|
||||||
|
for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
|
||||||
|
lastError = null;
|
||||||
try {
|
try {
|
||||||
|
final UUID nodeId = initializeOrReadNodeId(newNodeId);
|
||||||
|
if (nodeId != null) {
|
||||||
|
return nodeId;
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Error while attempting to setup the NodeId", e);
|
||||||
|
}
|
||||||
|
lastError = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (lastError != null) {
|
||||||
|
logger.error("Unable to setup a NodeId on the JDBC shared state", lastError);
|
||||||
|
} else {
|
||||||
|
logger.error("Unable to setup a NodeId on the JDBC shared state");
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId");
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
|
||||||
final UUID nodeId;
|
final UUID nodeId;
|
||||||
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
||||||
connection.setAutoCommit(false);
|
connection.setAutoCommit(false);
|
||||||
try {
|
try {
|
||||||
UUID readNodeId = rawReadNodeId();
|
//optimistic try to initialize nodeId
|
||||||
if (readNodeId == null) {
|
if (rawInitializeNodeId(newNodeId)) {
|
||||||
nodeId = nodeIdFactory.get();
|
nodeId = newNodeId;
|
||||||
rawWriteNodeId(nodeId);
|
|
||||||
} else {
|
} else {
|
||||||
nodeId = readNodeId;
|
nodeId = rawReadNodeId();
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
connection.rollback();
|
connection.rollback();
|
||||||
connection.setAutoCommit(true);
|
connection.setAutoCommit(true);
|
||||||
throw e;
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Rollback while trying to update NodeId to " + newNodeId, e);
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (nodeId != null) {
|
||||||
connection.commit();
|
connection.commit();
|
||||||
connection.setAutoCommit(true);
|
connection.setAutoCommit(true);
|
||||||
return nodeId;
|
return nodeId;
|
||||||
} catch (SQLException e) {
|
} else {
|
||||||
throw new IllegalStateException(e);
|
//that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet
|
||||||
|
connection.rollback();
|
||||||
|
connection.setAutoCommit(true);
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId");
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,6 +334,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
|
||||||
synchronized (connection) {
|
synchronized (connection) {
|
||||||
this.readNodeId.close();
|
this.readNodeId.close();
|
||||||
this.writeNodeId.close();
|
this.writeNodeId.close();
|
||||||
|
this.initializeNodeId.close();
|
||||||
this.readState.close();
|
this.readState.close();
|
||||||
this.writeState.close();
|
this.writeState.close();
|
||||||
this.liveLock.close();
|
this.liveLock.close();
|
||||||
|
|
|
@ -46,6 +46,7 @@ interface SharedStateManager extends AutoCloseable {
|
||||||
*
|
*
|
||||||
* @param nodeIdFactory used to create the nodeId if needed
|
* @param nodeIdFactory used to create the nodeId if needed
|
||||||
* @return the newly created NodeId or the old one if already present
|
* @return the newly created NodeId or the old one if already present
|
||||||
|
* @throws IllegalStateException if not able to setup the NodeId properly
|
||||||
*/
|
*/
|
||||||
UUID setup(Supplier<? extends UUID> nodeIdFactory);
|
UUID setup(Supplier<? extends UUID> nodeIdFactory);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue