diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java index 915c512e6d..b8ca1b988a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import javax.sql.DataSource; @@ -41,6 +42,8 @@ public class DefaultDatabaseLocker implements DatabaseLocker { protected Statements statements; protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; + protected PreparedStatement lockCreateStatement; + protected PreparedStatement lockUpdateStatement; protected Connection connection; protected boolean stopping; protected Handler exceptionHandler; @@ -64,13 +67,12 @@ public class DefaultDatabaseLocker implements DatabaseLocker { String sql = statements.getLockCreateStatement(); LOG.debug("Locking Query is "+sql); - PreparedStatement statement = null; while (true) { try { connection = dataSource.getConnection(); connection.setAutoCommit(false); - statement = connection.prepareStatement(sql); - statement.execute(); + lockCreateStatement = connection.prepareStatement(sql); + lockCreateStatement.execute(); break; } catch (Exception e) { try { @@ -110,13 +112,13 @@ public class DefaultDatabaseLocker implements DatabaseLocker { } } } finally { - if (null != statement) { + if (null != lockCreateStatement) { try { - statement.close(); + lockCreateStatement.close(); } catch (SQLException e1) { LOG.debug("Caught while closing statement: " + e1, e1); } - statement = null; + lockCreateStatement = null; } } @@ -133,6 +135,20 @@ public class DefaultDatabaseLocker implements DatabaseLocker { public void stop() throws Exception { stopping = true; + try { + if (lockCreateStatement != null) { + lockCreateStatement.cancel(); + } + } catch (SQLFeatureNotSupportedException e) { + LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e); + } + try { + if (lockUpdateStatement != null) { + lockUpdateStatement.cancel(); + } + } catch (SQLFeatureNotSupportedException e) { + LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e); + } try { if (connection != null && !connection.isClosed()) { try { @@ -145,6 +161,7 @@ public class DefaultDatabaseLocker implements DatabaseLocker { } catch (SQLException ignored) { LOG.debug("Exception while closing connection on shutdown", ignored); } + lockCreateStatement = null; } } } catch (SQLException sqle) { @@ -153,24 +170,25 @@ public class DefaultDatabaseLocker implements DatabaseLocker { } public boolean keepAlive() { - PreparedStatement statement = null; boolean result = false; try { - statement = connection.prepareStatement(statements.getLockUpdateStatement()); - statement.setLong(1, System.currentTimeMillis()); - int rows = statement.executeUpdate(); + lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement()); + lockUpdateStatement.setLong(1, System.currentTimeMillis()); + lockUpdateStatement.setQueryTimeout(10); + int rows = lockUpdateStatement.executeUpdate(); if (rows == 1) { result=true; } } catch (Exception e) { LOG.error("Failed to update database lock: " + e, e); } finally { - if (statement != null) { + if (lockUpdateStatement != null) { try { - statement.close(); + lockUpdateStatement.close(); } catch (SQLException e) { LOG.error("Failed to close statement",e); } + lockUpdateStatement = null; } } return result;