AMQ-4841: Fixed JDBC leased locker to allow being configured in any order in the XML file, being able to use the defined statemnts. Otherwise you would have had to define the locker last in the XML file. Thanks to Pat Fox for the test case.

This commit is contained in:
Claus Ibsen 2013-11-01 12:46:36 +01:00
parent 855419359c
commit 8a8fcb6ef4
6 changed files with 121 additions and 11 deletions

View File

@ -29,18 +29,27 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractJDBCLocker extends AbstractLocker {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class);
protected DataSource dataSource;
protected Statements statements;
private Statements statements;
protected JDBCPersistenceAdapter jdbcAdapter;
protected boolean createTablesOnStartup;
protected int queryTimeout = -1;
public void configure(PersistenceAdapter adapter) throws IOException {
if (adapter instanceof JDBCPersistenceAdapter) {
this.jdbcAdapter = (JDBCPersistenceAdapter) adapter;
this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
// we cannot get the statements (yet) as they may be configured later
}
}
protected Statements getStatements() {
if (statements == null && jdbcAdapter != null) {
statements = jdbcAdapter.getStatements();
}
return statements;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
@ -94,7 +103,8 @@ public abstract class AbstractJDBCLocker extends AbstractLocker {
@Override
public void preStart() {
if (createTablesOnStartup) {
String[] createStatements = this.statements.getCreateLockSchemaStatements();
String[] createStatements = getStatements().getCreateLockSchemaStatements();
Connection connection = null;
Statement statement = null;

View File

@ -44,7 +44,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
public void doStart() throws Exception {
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
String sql = statements.getLockCreateStatement();
String sql = getStatements().getLockCreateStatement();
LOG.debug("Locking Query is "+sql);
while (true) {
@ -158,7 +158,7 @@ public class DefaultDatabaseLocker extends AbstractJDBCLocker {
public boolean keepAlive() throws IOException {
boolean result = false;
try {
lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
lockUpdateStatement = connection.prepareStatement(getStatements().getLockUpdateStatement());
lockUpdateStatement.setLong(1, System.currentTimeMillis());
setQueryTimeout(lockUpdateStatement);
int rows = lockUpdateStatement.executeUpdate();

View File

@ -52,7 +52,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
}
LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
String sql = statements.getLeaseObtainStatement();
String sql = getStatements().getLeaseObtainStatement();
LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
long now = 0l;
@ -101,7 +101,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
statement = connection.prepareStatement(getStatements().getLeaseOwnerStatement());
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
@ -123,7 +123,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
}
protected long determineTimeDifference(Connection connection) throws SQLException {
PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
PreparedStatement statement = connection.prepareStatement(getStatements().getCurrentDateTime());
ResultSet resultSet = statement.executeQuery();
long result = 0l;
if (resultSet.next()) {
@ -151,7 +151,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
PreparedStatement statement = null;
try {
connection = getConnection();
statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
statement = connection.prepareStatement(getStatements().getLeaseUpdateStatement());
statement.setString(1, null);
statement.setLong(2, 0l);
statement.setString(3, getLeaseHolderId());
@ -169,7 +169,7 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker {
@Override
public boolean keepAlive() throws IOException {
boolean result = false;
final String sql = statements.getLeaseUpdateStatement();
final String sql = getStatements().getLeaseUpdateStatement();
LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
Connection connection = null;

View File

@ -45,7 +45,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = statements.getLockCreateStatement();
String sql = getStatements().getLockCreateStatement();
statement = connection.prepareStatement(sql);
if (statement.getMetaData() != null) {
ResultSet rs = statement.executeQuery();

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
public class JDBCLockTablePrefixTest extends TestCase {
public void testLockTable() throws Exception {
BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml");
broker.waitUntilStarted();
PersistenceAdapter pa = broker.getPersistenceAdapter();
assertNotNull(pa);
JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa;
assertEquals("TTT_", jpa.getStatements().getTablePrefix());
assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName());
assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName());
broker.stop();
broker.waitUntilStopped();
}
}

View File

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- normal ActiveMQ XML config which is less verbose & can be validated -->
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
useLoggingForShutdownErrors="true" useJmx="false"
persistent="true" vmConnectorURI="vm://javacoola"
useShutdownHook="false" deleteAllMessagesOnStartup="true">
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter dataDirectory="target/activemq-data" dataSource="#derby-ds" lockKeepAlivePeriod="5000" createTablesOnStartup="true">
<!-- test that we can define the locker before th statements,
but the locker will still pickup the statements -->
<amq:locker>
<amq:lease-database-locker lockAcquireSleepInterval="10000"/>
</amq:locker>
<amq:statements>
<amq:statements tablePrefix="TTT_" messageTableName="AMQ_MSGS2" durableSubAcksTableName="AMQ_ACKS2" lockTableName="AMQ_LOCK2"/>
</amq:statements>
<amq:adapter>
<amq:defaultJDBCAdapter/>
</amq:adapter>
</amq:jdbcPersistenceAdapter>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="vm://brokerConfigTest"/>
</amq:transportConnectors>
</amq:broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="target/derbyDb"/>
<property name="connectionAttributes" value=";create=true"/>
</bean>
</beans>