git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1325621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-04-13 05:55:50 +00:00
parent 8e05b7b733
commit 0be51cbfe2
1 changed files with 30 additions and 12 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource; import javax.sql.DataSource;
@ -41,6 +42,8 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
protected Statements statements; protected Statements statements;
protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected PreparedStatement lockCreateStatement;
protected PreparedStatement lockUpdateStatement;
protected Connection connection; protected Connection connection;
protected boolean stopping; protected boolean stopping;
protected Handler<Exception> exceptionHandler; protected Handler<Exception> exceptionHandler;
@ -64,13 +67,12 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
String sql = statements.getLockCreateStatement(); String sql = statements.getLockCreateStatement();
LOG.debug("Locking Query is "+sql); LOG.debug("Locking Query is "+sql);
PreparedStatement statement = null;
while (true) { while (true) {
try { try {
connection = dataSource.getConnection(); connection = dataSource.getConnection();
connection.setAutoCommit(false); connection.setAutoCommit(false);
statement = connection.prepareStatement(sql); lockCreateStatement = connection.prepareStatement(sql);
statement.execute(); lockCreateStatement.execute();
break; break;
} catch (Exception e) { } catch (Exception e) {
try { try {
@ -110,13 +112,13 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
} }
} }
} finally { } finally {
if (null != statement) { if (null != lockCreateStatement) {
try { try {
statement.close(); lockCreateStatement.close();
} catch (SQLException e1) { } catch (SQLException e1) {
LOG.debug("Caught while closing statement: " + e1, e1); LOG.debug("Caught while closing statement: " + e1, e1);
} }
statement = null; lockCreateStatement = null;
} }
} }
@ -133,6 +135,20 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
public void stop() throws Exception { public void stop() throws Exception {
stopping = true; stopping = true;
try {
if (lockCreateStatement != null) {
lockCreateStatement.cancel();
}
} catch (SQLFeatureNotSupportedException e) {
LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
}
try {
if (lockUpdateStatement != null) {
lockUpdateStatement.cancel();
}
} catch (SQLFeatureNotSupportedException e) {
LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
}
try { try {
if (connection != null && !connection.isClosed()) { if (connection != null && !connection.isClosed()) {
try { try {
@ -145,6 +161,7 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
} catch (SQLException ignored) { } catch (SQLException ignored) {
LOG.debug("Exception while closing connection on shutdown", ignored); LOG.debug("Exception while closing connection on shutdown", ignored);
} }
lockCreateStatement = null;
} }
} }
} catch (SQLException sqle) { } catch (SQLException sqle) {
@ -153,24 +170,25 @@ public class DefaultDatabaseLocker implements DatabaseLocker {
} }
public boolean keepAlive() { public boolean keepAlive() {
PreparedStatement statement = null;
boolean result = false; boolean result = false;
try { try {
statement = connection.prepareStatement(statements.getLockUpdateStatement()); lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
statement.setLong(1, System.currentTimeMillis()); lockUpdateStatement.setLong(1, System.currentTimeMillis());
int rows = statement.executeUpdate(); lockUpdateStatement.setQueryTimeout(10);
int rows = lockUpdateStatement.executeUpdate();
if (rows == 1) { if (rows == 1) {
result=true; result=true;
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to update database lock: " + e, e); LOG.error("Failed to update database lock: " + e, e);
} finally { } finally {
if (statement != null) { if (lockUpdateStatement != null) {
try { try {
statement.close(); lockUpdateStatement.close();
} catch (SQLException e) { } catch (SQLException e) {
LOG.error("Failed to close statement",e); LOG.error("Failed to close statement",e);
} }
lockUpdateStatement = null;
} }
} }
return result; return result;