AMQ-4841: Fixed JDBC leased locker to allow being configured in any order in the XML file, being able to use the defined statemnts. Otherwise you would have had to define the locker last in the XML file. Thanks to Pat Fox for the test case.

This commit is contained in:
Claus Ibsen 2013-11-01 12:46:36 +01:00 committed by Hadrian Zbarcea
parent 8a9ea481fd
commit a38d26b419
6 changed files with 121 additions and 11 deletions

View File

@ -29,18 +29,27 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractJDBCLocker extends AbstractLocker { public abstract class AbstractJDBCLocker extends AbstractLocker {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class);
protected DataSource dataSource; protected DataSource dataSource;
protected Statements statements; private Statements statements;
protected JDBCPersistenceAdapter jdbcAdapter;
protected boolean createTablesOnStartup; protected boolean createTablesOnStartup;
protected int queryTimeout = -1; protected int queryTimeout = -1;
public void configure(PersistenceAdapter adapter) throws IOException { public void configure(PersistenceAdapter adapter) throws IOException {
if (adapter instanceof JDBCPersistenceAdapter) { if (adapter instanceof JDBCPersistenceAdapter) {
this.jdbcAdapter = (JDBCPersistenceAdapter) adapter;
this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource(); this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
this.statements = ((JDBCPersistenceAdapter) adapter).getStatements(); // we cannot get the statements (yet) as they may be configured later
} }
} }
protected Statements getStatements() {
if (statements == null && jdbcAdapter != null) {
statements = jdbcAdapter.getStatements();
}
return statements;
}
public void setDataSource(DataSource dataSource) { public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@ -94,7 +103,8 @@ public abstract class AbstractJDBCLocker extends AbstractLocker {
@Override @Override
public void preStart() { public void preStart() {
if (createTablesOnStartup) { if (createTablesOnStartup) {
String[] createStatements = this.statements.getCreateLockSchemaStatements();
String[] createStatements = getStatements().getCreateLockSchemaStatements();
Connection connection = null; Connection connection = null;
Statement statement = null; Statement statement = null;

View File

@ -44,7 +44,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
public void doStart() throws Exception { public void doStart() throws Exception {
LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
String sql = statements.getLockCreateStatement(); String sql = getStatements().getLockCreateStatement();
LOG.debug("Locking Query is "+sql); LOG.debug("Locking Query is "+sql);
while (true) { while (true) {
@ -158,7 +158,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
public boolean keepAlive() throws IOException { public boolean keepAlive() throws IOException {
boolean result = false; boolean result = false;
try { try {
lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement()); lockUpdateStatement = connection.prepareStatement(getStatements().getLockUpdateStatement());
lockUpdateStatement.setLong(1, System.currentTimeMillis()); lockUpdateStatement.setLong(1, System.currentTimeMillis());
setQueryTimeout(lockUpdateStatement); setQueryTimeout(lockUpdateStatement);
int rows = lockUpdateStatement.executeUpdate(); int rows = lockUpdateStatement.executeUpdate();

View File

@ -52,7 +52,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
} }
LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master"); LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
String sql = statements.getLeaseObtainStatement(); String sql = getStatements().getLeaseObtainStatement();
LOG.debug(getLeaseHolderId() + " locking Query is "+sql); LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
long now = 0l; long now = 0l;
@ -101,7 +101,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException { private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
PreparedStatement statement = null; PreparedStatement statement = null;
try { try {
statement = connection.prepareStatement(statements.getLeaseOwnerStatement()); statement = connection.prepareStatement(getStatements().getLeaseOwnerStatement());
ResultSet resultSet = statement.executeQuery(); ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) { while (resultSet.next()) {
LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2))); LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
@ -123,7 +123,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
} }
protected long determineTimeDifference(Connection connection) throws SQLException { protected long determineTimeDifference(Connection connection) throws SQLException {
PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime()); PreparedStatement statement = connection.prepareStatement(getStatements().getCurrentDateTime());
ResultSet resultSet = statement.executeQuery(); ResultSet resultSet = statement.executeQuery();
long result = 0l; long result = 0l;
if (resultSet.next()) { if (resultSet.next()) {
@ -151,7 +151,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
PreparedStatement statement = null; PreparedStatement statement = null;
try { try {
connection = getConnection(); connection = getConnection();
statement = connection.prepareStatement(statements.getLeaseUpdateStatement()); statement = connection.prepareStatement(getStatements().getLeaseUpdateStatement());
statement.setString(1, null); statement.setString(1, null);
statement.setLong(2, 0l); statement.setLong(2, 0l);
statement.setString(3, getLeaseHolderId()); statement.setString(3, getLeaseHolderId());
@ -169,7 +169,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
@Override @Override
public boolean keepAlive() throws IOException { public boolean keepAlive() throws IOException {
boolean result = false; boolean result = false;
final String sql = statements.getLeaseUpdateStatement(); final String sql = getStatements().getLeaseUpdateStatement();
LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql); LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
Connection connection = null; Connection connection = null;

View File

@ -45,7 +45,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
try { try {
connection = dataSource.getConnection(); connection = dataSource.getConnection();
connection.setAutoCommit(false); connection.setAutoCommit(false);
String sql = statements.getLockCreateStatement(); String sql = getStatements().getLockCreateStatement();
statement = connection.prepareStatement(sql); statement = connection.prepareStatement(sql);
if (statement.getMetaData() != null) { if (statement.getMetaData() != null) {
ResultSet rs = statement.executeQuery(); ResultSet rs = statement.executeQuery();

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
public class JDBCLockTablePrefixTest extends TestCase {
public void testLockTable() throws Exception {
BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml");
broker.waitUntilStarted();
PersistenceAdapter pa = broker.getPersistenceAdapter();
assertNotNull(pa);
JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa;
assertEquals("TTT_", jpa.getStatements().getTablePrefix());
assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName());
assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName());
broker.stop();
broker.waitUntilStopped();
}
}

View File

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- normal ActiveMQ XML config which is less verbose & can be validated -->
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
useLoggingForShutdownErrors="true" useJmx="false"
persistent="true" vmConnectorURI="vm://javacoola"
useShutdownHook="false" deleteAllMessagesOnStartup="true">
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter dataDirectory="target/activemq-data" dataSource="#derby-ds" lockKeepAlivePeriod="5000" createTablesOnStartup="true">
<!-- test that we can define the locker before th statements,
but the locker will still pickup the statements -->
<amq:locker>
<amq:lease-database-locker lockAcquireSleepInterval="10000"/>
</amq:locker>
<amq:statements>
<amq:statements tablePrefix="TTT_" messageTableName="AMQ_MSGS2" durableSubAcksTableName="AMQ_ACKS2" lockTableName="AMQ_LOCK2"/>
</amq:statements>
<amq:adapter>
<amq:defaultJDBCAdapter/>
</amq:adapter>
</amq:jdbcPersistenceAdapter>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="vm://brokerConfigTest"/>
</amq:transportConnectors>
</amq:broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="target/derbyDb"/>
<property name="connectionAttributes" value=";create=true"/>
</bean>
</beans>