diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java index 8a56e87d93..38aab9be49 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java @@ -27,6 +27,7 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker { protected String name; protected boolean failIfLocked = false; protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; + protected LockableServiceSupport lockable; @Override public boolean keepAlive() throws IOException { @@ -38,6 +39,10 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker { this.lockAcquireSleepInterval = lockAcquireSleepInterval; } + public long getLockAcquireSleepInterval() { + return lockAcquireSleepInterval; + } + @Override public void setName(String name) { this.name = name; @@ -47,4 +52,9 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker { public void setFailIfLocked(boolean failIfLocked) { this.failIfLocked = failIfLocked; } + + @Override + public void setLockable(LockableServiceSupport lockableServiceSupport) { + this.lockable = lockableServiceSupport; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java index 78480e6e7c..4f83d30cfd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java @@ -61,6 +61,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L @Override public void setLocker(Locker locker) throws IOException { this.locker = locker; + locker.setLockable(this); if (this instanceof PersistenceAdapter) { this.locker.configure((PersistenceAdapter)this); } @@ -68,7 +69,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L public Locker getLocker() throws IOException { if (this.locker == null) { - this.locker = createDefaultLocker(); + setLocker(createDefaultLocker()); } return this.locker; } @@ -165,4 +166,8 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } + + public BrokerService getBrokerService() { + return this.brokerService; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java index 6415def641..11a2636061 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Locker.java @@ -54,6 +54,10 @@ public interface Locker extends Service { */ public void setFailIfLocked(boolean failIfLocked); + /** + * A reference to what is locked + */ + public void setLockable(LockableServiceSupport lockable); /** * Optionally configure the locker with the persistence adapter currently used diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java new file mode 100644 index 0000000000..e3cc8010d1 --- /dev/null +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/AbstractJDBCLocker.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; +import org.apache.activemq.broker.AbstractLocker; +import org.apache.activemq.store.PersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractJDBCLocker extends AbstractLocker { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class); + protected DataSource dataSource; + protected Statements statements; + + protected boolean createTablesOnStartup; + protected int queryTimeout = -1; + + public void configure(PersistenceAdapter adapter) throws IOException { + if (adapter instanceof JDBCPersistenceAdapter) { + this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource(); + this.statements = ((JDBCPersistenceAdapter) adapter).getStatements(); + } + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + + public void setStatements(Statements statements) { + this.statements = statements; + } + + protected void setQueryTimeout(Statement statement) throws SQLException { + if (queryTimeout > 0) { + statement.setQueryTimeout(queryTimeout); + } + } + + public int getQueryTimeout() { + return queryTimeout; + } + + public void setQueryTimeout(int queryTimeout) { + this.queryTimeout = queryTimeout; + } + + public void setCreateTablesOnStartup(boolean createTablesOnStartup) { + this.createTablesOnStartup = createTablesOnStartup; + } + + protected Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + protected void close(Connection connection) { + if (null != connection) { + try { + connection.close(); + } catch (SQLException e1) { + LOG.debug("exception while closing connection: " + e1, e1); + } + } + } + + protected void close(Statement statement) { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e1) { + LOG.debug("exception while closing statement: " + e1, e1); + } + } + } + + @Override + public void preStart() { + if (createTablesOnStartup) { + String[] createStatements = this.statements.getCreateLockSchemaStatements(); + + Connection connection = null; + Statement statement = null; + try { + connection = getConnection(); + statement = connection.createStatement(); + setQueryTimeout(statement); + + for (int i = 0; i < createStatements.length; i++) { + LOG.debug("Executing SQL: " + createStatements[i]); + try { + statement.execute(createStatements[i]); + } catch (SQLException e) { + LOG.info("Could not create lock tables; they could already exist." + " Failure was: " + + createStatements[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + + " Vendor code: " + e.getErrorCode()); + } + } + } catch (SQLException e) { + LOG.warn("Could not create lock tables; Failure Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + + " Vendor code: " + e.getErrorCode(), e); + } finally { + close(statement); + close(connection); + } + } + } + +} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java index 79c7a8464e..64d24ab713 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java @@ -21,11 +21,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; - -import javax.sql.DataSource; - -import org.apache.activemq.broker.AbstractLocker; -import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.util.Handler; import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; @@ -38,27 +33,15 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean element="database-locker" * */ -public class DefaultDatabaseLocker extends AbstractLocker { +public class DefaultDatabaseLocker extends AbstractJDBCLocker { private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class); - protected DataSource dataSource; - protected Statements statements; protected volatile PreparedStatement lockCreateStatement; protected volatile PreparedStatement lockUpdateStatement; protected volatile Connection connection; - protected volatile boolean stopping; protected Handler exceptionHandler; - protected int queryTimeout = 10; - public void configure(PersistenceAdapter adapter) throws IOException { - if (adapter instanceof JDBCPersistenceAdapter) { - this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource(); - this.statements = ((JDBCPersistenceAdapter) adapter).getStatements(); - } - } - public void doStart() throws Exception { - stopping = false; LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); String sql = statements.getLockCreateStatement(); @@ -73,7 +56,7 @@ public class DefaultDatabaseLocker extends AbstractLocker { break; } catch (Exception e) { try { - if (stopping) { + if (isStopping()) { throw new Exception( "Cannot start broker as being asked to shut down. " + "Interrupted attempt to acquire lock: " @@ -136,7 +119,6 @@ public class DefaultDatabaseLocker extends AbstractLocker { } public void doStop(ServiceStopper stopper) throws Exception { - stopping = true; try { if (lockCreateStatement != null) { lockCreateStatement.cancel(); @@ -178,9 +160,7 @@ public class DefaultDatabaseLocker extends AbstractLocker { try { lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement()); lockUpdateStatement.setLong(1, System.currentTimeMillis()); - if (queryTimeout > 0) { - lockUpdateStatement.setQueryTimeout(queryTimeout); - } + setQueryTimeout(lockUpdateStatement); int rows = lockUpdateStatement.executeUpdate(); if (rows == 1) { result=true; @@ -216,11 +196,4 @@ public class DefaultDatabaseLocker extends AbstractLocker { this.exceptionHandler = exceptionHandler; } - public int getQueryTimeout() { - return queryTimeout; - } - - public void setQueryTimeout(int queryTimeout) { - this.queryTimeout = queryTimeout; - } } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java index 37c7064576..029b1df6f8 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java @@ -24,10 +24,6 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.util.Date; import java.util.concurrent.TimeUnit; -import javax.sql.DataSource; - -import org.apache.activemq.broker.AbstractLocker; -import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; @@ -40,42 +36,27 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean element="lease-database-locker" * */ -public class LeaseDatabaseLocker extends AbstractLocker { +public class LeaseDatabaseLocker extends AbstractJDBCLocker { private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class); - protected DataSource dataSource; - protected Statements statements; - protected boolean stopping; protected int maxAllowableDiffFromDBTime = 0; protected long diffFromCurrentTime = Long.MAX_VALUE; protected String leaseHolderId; - protected int queryTimeout = -1; - JDBCPersistenceAdapter persistenceAdapter; - - public void configure(PersistenceAdapter adapter) throws IOException { - if (adapter instanceof JDBCPersistenceAdapter) { - this.persistenceAdapter = (JDBCPersistenceAdapter)adapter; - this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource(); - this.statements = ((JDBCPersistenceAdapter) adapter).getStatements(); - } - } - public void doStart() throws Exception { - stopping = false; - if (lockAcquireSleepInterval < persistenceAdapter.getLockKeepAlivePeriod()) { - LOG.warn("Persistence adapter keep alive period: " + persistenceAdapter.getLockKeepAlivePeriod() + if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) { + LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod() + ", which renews the lease, is less than lockAcquireSleepInterval: " + lockAcquireSleepInterval + ", the lease duration. These values will allow the lease to expire."); } - LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker"); + LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master"); String sql = statements.getLeaseObtainStatement(); LOG.debug(getLeaseHolderId() + " locking Query is "+sql); long now = 0l; - while (!stopping) { + while (!isStopping()) { Connection connection = null; PreparedStatement statement = null; try { @@ -110,43 +91,13 @@ public class LeaseDatabaseLocker extends AbstractLocker { LOG.info(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again..."); TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval); } - if (stopping) { + if (isStopping()) { throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop"); } LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now) + " on dataSource: " + dataSource); } - private void setQueryTimeout(PreparedStatement statement) throws SQLException { - if (queryTimeout > 0) { - statement.setQueryTimeout(queryTimeout); - } - } - - private Connection getConnection() throws SQLException { - return dataSource.getConnection(); - } - - private void close(Connection connection) { - if (null != connection) { - try { - connection.close(); - } catch (SQLException e1) { - LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1); - } - } - } - - private void close(PreparedStatement statement) { - if (null != statement) { - try { - statement.close(); - } catch (SQLException e1) { - LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1); - } - } - } - private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException { PreparedStatement statement = null; try { @@ -188,8 +139,7 @@ public class LeaseDatabaseLocker extends AbstractLocker { } public void doStop(ServiceStopper stopper) throws Exception { - stopping = true; - if (persistenceAdapter.getBrokerService() != null && persistenceAdapter.getBrokerService().isRestartRequested()) { + if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested()) { // keep our lease for restart return; } @@ -244,7 +194,7 @@ public class LeaseDatabaseLocker extends AbstractLocker { } catch (Exception e) { LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e); IOException ioe = IOExceptionSupport.create(e); - persistenceAdapter.getBrokerService().handleIOException(ioe); + lockable.getBrokerService().handleIOException(ioe); throw ioe; } finally { close(statement); @@ -253,26 +203,10 @@ public class LeaseDatabaseLocker extends AbstractLocker { return result; } - public long getLockAcquireSleepInterval() { - return lockAcquireSleepInterval; - } - - public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { - this.lockAcquireSleepInterval = lockAcquireSleepInterval; - } - - public int getQueryTimeout() { - return queryTimeout; - } - - public void setQueryTimeout(int queryTimeout) { - this.queryTimeout = queryTimeout; - } - public String getLeaseHolderId() { if (leaseHolderId == null) { - if (persistenceAdapter.getBrokerService() != null) { - leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName(); + if (lockable.getBrokerService() != null) { + leaseHolderId = lockable.getBrokerService().getBrokerName(); } } return leaseHolderId; diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index d23adac5fc..5df6f2ef1f 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -57,6 +57,7 @@ public class Statements { private String removeAllMessagesStatement; private String removeAllSubscriptionsStatement; private String[] createSchemaStatements; + private String[] createLockSchemaStatements; private String[] dropSchemaStatements; private String lockCreateStatement; private String lockUpdateStatement; @@ -106,10 +107,6 @@ public class Statements { + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", - "CREATE TABLE " + getFullLockTableName() - + "( ID " + longDataType + " NOT NULL, TIME " + longDataType - + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )", - "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType, "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)", "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + stringIdDataType, @@ -121,7 +118,24 @@ public class Statements { "CREATE INDEX " + getFullAckTableName() + "_XIDX ON " + getFullAckTableName() + " (XID)" }; } - return createSchemaStatements; + getCreateLockSchemaStatements(); + String[] allCreateStatements = new String[createSchemaStatements.length + createLockSchemaStatements.length]; + System.arraycopy(createSchemaStatements, 0, allCreateStatements, 0, createSchemaStatements.length); + System.arraycopy(createLockSchemaStatements, 0, allCreateStatements, createSchemaStatements.length, createLockSchemaStatements.length); + + return allCreateStatements; + } + + public String[] getCreateLockSchemaStatements() { + if (createLockSchemaStatements == null) { + createLockSchemaStatements = new String[] { + "CREATE TABLE " + getFullLockTableName() + + "( ID " + longDataType + " NOT NULL, TIME " + longDataType + + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )", + "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)" + }; + } + return createLockSchemaStatements; } public String getDropAckPKAlterStatementEnd() { @@ -762,6 +776,10 @@ public class Statements { this.createSchemaStatements = createSchemaStatments; } + public void setCreateLockSchemaStatements(String[] createLockSchemaStatments) { + this.createLockSchemaStatements = createLockSchemaStatments; + } + public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatementWithPriority) { this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatementWithPriority; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java index 8cdef57b96..d69239ccf0 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java @@ -38,7 +38,6 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker { @Override public void doStart() throws Exception { - stopping = false; LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); PreparedStatement statement = null; @@ -57,7 +56,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker { } break; } catch (Exception e) { - if (stopping) { + if (isStopping()) { throw new Exception("Cannot start broker as being asked to shut down. Interrupted attempt to acquire lock: " + e, e); } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala index e7fa916fa3..368ea96fff 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala @@ -22,7 +22,7 @@ import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport} import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore} import java.net.{NetworkInterface, InetAddress} import org.fusesource.hawtdispatch._ -import org.apache.activemq.broker.Locker +import org.apache.activemq.broker.{LockableServiceSupport, Locker} import org.apache.activemq.store.PersistenceAdapter import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean @@ -190,6 +190,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore { def createDefaultLocker(): Locker = new Locker { + def setLockable(lockable: LockableServiceSupport) {} def configure(persistenceAdapter: PersistenceAdapter) {} def setFailIfLocked(failIfLocked: Boolean) {} def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java index ce1022255c..fb04803128 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java @@ -43,7 +43,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest verifyExpectedBroker(inflightMessageCount); if (++inflightMessageCount == failureCount) { LOG.info("STOPPING DB!@!!!!"); - final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate(); + final EmbeddedDataSource ds = ((SyncCreateDataSource)getExistingDataSource()).getDelegate(); ds.setShutdownDatabase("shutdown"); LOG.info("DB STOPPED!@!!!!"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java index 4ce01fba6c..c7b0ec6980 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java @@ -41,7 +41,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { protected void setUp() throws Exception { // startup db - sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); + sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); super.setUp(); } @@ -109,61 +109,4 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { return sharedDs; } - // prevent concurrent calls from attempting to create the db at the same time - // can result in "already exists in this jvm" errors - class SyncDataSource implements DataSource { - final EmbeddedDataSource delegate; - SyncDataSource(EmbeddedDataSource dataSource) { - this.delegate = dataSource; - } - @Override - public Connection getConnection() throws SQLException { - synchronized (this) { - return delegate.getConnection(); - } - } - - @Override - public Connection getConnection(String username, String password) throws SQLException { - synchronized (this) { - return delegate.getConnection(); - } - } - - @Override - public PrintWriter getLogWriter() throws SQLException { - return null; - } - - @Override - public void setLogWriter(PrintWriter out) throws SQLException { - } - - @Override - public void setLoginTimeout(int seconds) throws SQLException { - } - - @Override - public int getLoginTimeout() throws SQLException { - return 0; - } - - @Override - public T unwrap(Class iface) throws SQLException { - return null; - } - - @Override - public boolean isWrapperFor(Class iface) throws SQLException { - return false; - } - - EmbeddedDataSource getDelegate() { - return delegate; - } - - public Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } - }; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java new file mode 100644 index 0000000000..5331a226a6 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.logging.Logger; +import javax.sql.DataSource; +import org.apache.derby.jdbc.EmbeddedDataSource; + +// prevent concurrent calls from attempting to create the db at the same time +// can result in "already exists in this jvm" errors + +public class SyncCreateDataSource implements DataSource { + final EmbeddedDataSource delegate; + + SyncCreateDataSource(EmbeddedDataSource dataSource) { + this.delegate = dataSource; + } + + @Override + public Connection getConnection() throws SQLException { + synchronized (this) { + return delegate.getConnection(); + } + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + synchronized (this) { + return delegate.getConnection(); + } + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + EmbeddedDataSource getDelegate() { + return delegate; + } + + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java new file mode 100644 index 0000000000..ba59eb74b1 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.IOException; +import java.net.URI; +import javax.sql.DataSource; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; +import org.apache.activemq.store.jdbc.Statements; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOHelper; +import org.apache.derby.jdbc.EmbeddedDataSource; + +public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { + protected DataSource sharedDs; + protected String MASTER_URL = "tcp://localhost:62001"; + protected String SLAVE_URL = "tcp://localhost:62002"; + + protected void setUp() throws Exception { + // startup db + sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); + super.setUp(); + } + + protected void createMaster() throws Exception { + master = new BrokerService(); + master.setBrokerName("master"); + master.addConnector(MASTER_URL); + master.setUseJmx(false); + master.setPersistent(true); + master.setDeleteAllMessagesOnStartup(true); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter(); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setCreateTablesOnStartup(true); + leaseDatabaseLocker.setDataSource(getExistingDataSource()); + leaseDatabaseLocker.setStatements(new Statements()); + configureLocker(kahaDBPersistenceAdapter); + kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker); + configureBroker(master); + master.start(); + } + + protected void configureBroker(BrokerService brokerService) { + DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler(); + // we want any store io exception to stop the broker + stopBrokerOnStoreException.setIgnoreSQLExceptions(false); + brokerService.setIoExceptionHandler(stopBrokerOnStoreException); + } + + protected void createSlave() throws Exception { + // use a separate thread as the slave will block waiting for + // the exclusive db lock + Thread t = new Thread() { + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("slave"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI(SLAVE_URL)); + broker.addConnector(connector); + broker.setUseJmx(false); + broker.setPersistent(true); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setDataSource(getExistingDataSource()); + leaseDatabaseLocker.setStatements(new Statements()); + configureLocker(kahaDBPersistenceAdapter); + kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker); + + configureBroker(broker); + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } catch (IllegalStateException expectedOnShutdown) { + } catch (Exception e) { + fail("failed to start slave broker, reason:" + e); + } + } + }; + t.start(); + } + + protected void configureLocker(KahaDBPersistenceAdapter kahaDBPersistenceAdapter) throws IOException { + kahaDBPersistenceAdapter.setLockKeepAlivePeriod(500); + kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500); + } + + protected DataSource getExistingDataSource() throws Exception { + return sharedDs; + } + +}