https://issues.apache.org/jira/browse/AMQ-4365 - allow lease locker to be used by kahadb - remove deps on jdbc pa. LockableService now passes a reference to a locker so it can pull the brokerService, extracted some of the jdbc lock common stuff, additional test kahadb with jdbc lease

This commit is contained in:
gtully 2013-09-20 15:33:24 +01:00
parent 0f90695db7
commit efaa351db7
13 changed files with 383 additions and 174 deletions

View File

@ -27,6 +27,7 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker {
protected String name;
protected boolean failIfLocked = false;
protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected LockableServiceSupport lockable;
@Override
public boolean keepAlive() throws IOException {
@ -38,6 +39,10 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker {
this.lockAcquireSleepInterval = lockAcquireSleepInterval;
}
public long getLockAcquireSleepInterval() {
return lockAcquireSleepInterval;
}
@Override
public void setName(String name) {
this.name = name;
@ -47,4 +52,9 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker {
public void setFailIfLocked(boolean failIfLocked) {
this.failIfLocked = failIfLocked;
}
@Override
public void setLockable(LockableServiceSupport lockableServiceSupport) {
this.lockable = lockableServiceSupport;
}
}

View File

@ -61,6 +61,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
@Override
public void setLocker(Locker locker) throws IOException {
this.locker = locker;
locker.setLockable(this);
if (this instanceof PersistenceAdapter) {
this.locker.configure((PersistenceAdapter)this);
}
@ -68,7 +69,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
public Locker getLocker() throws IOException {
if (this.locker == null) {
this.locker = createDefaultLocker();
setLocker(createDefaultLocker());
}
return this.locker;
}
@ -165,4 +166,8 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public BrokerService getBrokerService() {
return this.brokerService;
}
}

View File

@ -54,6 +54,10 @@ public interface Locker extends Service {
*/
public void setFailIfLocked(boolean failIfLocked);
/**
* A reference to what is locked
*/
public void setLockable(LockableServiceSupport lockable);
/**
* Optionally configure the locker with the persistence adapter currently used

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.DataSource;
import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.store.PersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractJDBCLocker extends AbstractLocker {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCLocker.class);
protected DataSource dataSource;
protected Statements statements;
protected boolean createTablesOnStartup;
protected int queryTimeout = -1;
public void configure(PersistenceAdapter adapter) throws IOException {
if (adapter instanceof JDBCPersistenceAdapter) {
this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
}
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public void setStatements(Statements statements) {
this.statements = statements;
}
protected void setQueryTimeout(Statement statement) throws SQLException {
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
}
public int getQueryTimeout() {
return queryTimeout;
}
public void setQueryTimeout(int queryTimeout) {
this.queryTimeout = queryTimeout;
}
public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
this.createTablesOnStartup = createTablesOnStartup;
}
protected Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
protected void close(Connection connection) {
if (null != connection) {
try {
connection.close();
} catch (SQLException e1) {
LOG.debug("exception while closing connection: " + e1, e1);
}
}
}
protected void close(Statement statement) {
if (null != statement) {
try {
statement.close();
} catch (SQLException e1) {
LOG.debug("exception while closing statement: " + e1, e1);
}
}
}
@Override
public void preStart() {
if (createTablesOnStartup) {
String[] createStatements = this.statements.getCreateLockSchemaStatements();
Connection connection = null;
Statement statement = null;
try {
connection = getConnection();
statement = connection.createStatement();
setQueryTimeout(statement);
for (int i = 0; i < createStatements.length; i++) {
LOG.debug("Executing SQL: " + createStatements[i]);
try {
statement.execute(createStatements[i]);
} catch (SQLException e) {
LOG.info("Could not create lock tables; they could already exist." + " Failure was: "
+ createStatements[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ " Vendor code: " + e.getErrorCode());
}
}
} catch (SQLException e) {
LOG.warn("Could not create lock tables; Failure Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ " Vendor code: " + e.getErrorCode(), e);
} finally {
close(statement);
close(connection);
}
}
}
}

View File

@ -21,11 +21,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource;
import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Handler;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
@ -38,27 +33,15 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="database-locker"
*
*/
public class DefaultDatabaseLocker extends AbstractLocker {
public class DefaultDatabaseLocker extends AbstractJDBCLocker {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
protected DataSource dataSource;
protected Statements statements;
protected volatile PreparedStatement lockCreateStatement;
protected volatile PreparedStatement lockUpdateStatement;
protected volatile Connection connection;
protected volatile boolean stopping;
protected Handler<Exception> exceptionHandler;
protected int queryTimeout = 10;
public void configure(PersistenceAdapter adapter) throws IOException {
if (adapter instanceof JDBCPersistenceAdapter) {
this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
}
}
public void doStart() throws Exception {
stopping = false;
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
String sql = statements.getLockCreateStatement();
@ -73,7 +56,7 @@ public class DefaultDatabaseLocker extends AbstractLocker {
break;
} catch (Exception e) {
try {
if (stopping) {
if (isStopping()) {
throw new Exception(
"Cannot start broker as being asked to shut down. "
+ "Interrupted attempt to acquire lock: "
@ -136,7 +119,6 @@ public class DefaultDatabaseLocker extends AbstractLocker {
}
public void doStop(ServiceStopper stopper) throws Exception {
stopping = true;
try {
if (lockCreateStatement != null) {
lockCreateStatement.cancel();
@ -178,9 +160,7 @@ public class DefaultDatabaseLocker extends AbstractLocker {
try {
lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
lockUpdateStatement.setLong(1, System.currentTimeMillis());
if (queryTimeout > 0) {
lockUpdateStatement.setQueryTimeout(queryTimeout);
}
setQueryTimeout(lockUpdateStatement);
int rows = lockUpdateStatement.executeUpdate();
if (rows == 1) {
result=true;
@ -216,11 +196,4 @@ public class DefaultDatabaseLocker extends AbstractLocker {
this.exceptionHandler = exceptionHandler;
}
public int getQueryTimeout() {
return queryTimeout;
}
public void setQueryTimeout(int queryTimeout) {
this.queryTimeout = queryTimeout;
}
}

View File

@ -24,10 +24,6 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
@ -40,42 +36,27 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="lease-database-locker"
*
*/
public class LeaseDatabaseLocker extends AbstractLocker {
public class LeaseDatabaseLocker extends AbstractJDBCLocker {
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
protected DataSource dataSource;
protected Statements statements;
protected boolean stopping;
protected int maxAllowableDiffFromDBTime = 0;
protected long diffFromCurrentTime = Long.MAX_VALUE;
protected String leaseHolderId;
protected int queryTimeout = -1;
JDBCPersistenceAdapter persistenceAdapter;
public void configure(PersistenceAdapter adapter) throws IOException {
if (adapter instanceof JDBCPersistenceAdapter) {
this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
}
}
public void doStart() throws Exception {
stopping = false;
if (lockAcquireSleepInterval < persistenceAdapter.getLockKeepAlivePeriod()) {
LOG.warn("Persistence adapter keep alive period: " + persistenceAdapter.getLockKeepAlivePeriod()
if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) {
LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod()
+ ", which renews the lease, is less than lockAcquireSleepInterval: " + lockAcquireSleepInterval
+ ", the lease duration. These values will allow the lease to expire.");
}
LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
String sql = statements.getLeaseObtainStatement();
LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
long now = 0l;
while (!stopping) {
while (!isStopping()) {
Connection connection = null;
PreparedStatement statement = null;
try {
@ -110,43 +91,13 @@ public class LeaseDatabaseLocker extends AbstractLocker {
LOG.info(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
}
if (stopping) {
if (isStopping()) {
throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
}
LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now) + " on dataSource: " + dataSource);
}
private void setQueryTimeout(PreparedStatement statement) throws SQLException {
if (queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
}
private Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
private void close(Connection connection) {
if (null != connection) {
try {
connection.close();
} catch (SQLException e1) {
LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
}
}
}
private void close(PreparedStatement statement) {
if (null != statement) {
try {
statement.close();
} catch (SQLException e1) {
LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
}
}
}
private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
PreparedStatement statement = null;
try {
@ -188,8 +139,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
}
public void doStop(ServiceStopper stopper) throws Exception {
stopping = true;
if (persistenceAdapter.getBrokerService() != null && persistenceAdapter.getBrokerService().isRestartRequested()) {
if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested()) {
// keep our lease for restart
return;
}
@ -244,7 +194,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
} catch (Exception e) {
LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
lockable.getBrokerService().handleIOException(ioe);
throw ioe;
} finally {
close(statement);
@ -253,26 +203,10 @@ public class LeaseDatabaseLocker extends AbstractLocker {
return result;
}
public long getLockAcquireSleepInterval() {
return lockAcquireSleepInterval;
}
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
this.lockAcquireSleepInterval = lockAcquireSleepInterval;
}
public int getQueryTimeout() {
return queryTimeout;
}
public void setQueryTimeout(int queryTimeout) {
this.queryTimeout = queryTimeout;
}
public String getLeaseHolderId() {
if (leaseHolderId == null) {
if (persistenceAdapter.getBrokerService() != null) {
leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
if (lockable.getBrokerService() != null) {
leaseHolderId = lockable.getBrokerService().getBrokerName();
}
}
return leaseHolderId;

View File

@ -57,6 +57,7 @@ public class Statements {
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
private String[] createSchemaStatements;
private String[] createLockSchemaStatements;
private String[] dropSchemaStatements;
private String lockCreateStatement;
private String lockUpdateStatement;
@ -106,10 +107,6 @@ public class Statements {
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
"CREATE TABLE " + getFullLockTableName()
+ "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + stringIdDataType,
@ -121,7 +118,24 @@ public class Statements {
"CREATE INDEX " + getFullAckTableName() + "_XIDX ON " + getFullAckTableName() + " (XID)"
};
}
return createSchemaStatements;
getCreateLockSchemaStatements();
String[] allCreateStatements = new String[createSchemaStatements.length + createLockSchemaStatements.length];
System.arraycopy(createSchemaStatements, 0, allCreateStatements, 0, createSchemaStatements.length);
System.arraycopy(createLockSchemaStatements, 0, allCreateStatements, createSchemaStatements.length, createLockSchemaStatements.length);
return allCreateStatements;
}
public String[] getCreateLockSchemaStatements() {
if (createLockSchemaStatements == null) {
createLockSchemaStatements = new String[] {
"CREATE TABLE " + getFullLockTableName()
+ "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)"
};
}
return createLockSchemaStatements;
}
public String getDropAckPKAlterStatementEnd() {
@ -762,6 +776,10 @@ public class Statements {
this.createSchemaStatements = createSchemaStatments;
}
public void setCreateLockSchemaStatements(String[] createLockSchemaStatments) {
this.createLockSchemaStatements = createLockSchemaStatments;
}
public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatementWithPriority) {
this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatementWithPriority;
}

View File

@ -38,7 +38,6 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
@Override
public void doStart() throws Exception {
stopping = false;
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
PreparedStatement statement = null;
@ -57,7 +56,7 @@ public class TransactDatabaseLocker extends DefaultDatabaseLocker {
}
break;
} catch (Exception e) {
if (stopping) {
if (isStopping()) {
throw new Exception("Cannot start broker as being asked to shut down. Interrupted attempt to acquire lock: " + e, e);
}

View File

@ -22,7 +22,7 @@ import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
import java.net.{NetworkInterface, InetAddress}
import org.fusesource.hawtdispatch._
import org.apache.activemq.broker.Locker
import org.apache.activemq.broker.{LockableServiceSupport, Locker}
import org.apache.activemq.store.PersistenceAdapter
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
@ -190,6 +190,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
def createDefaultLocker(): Locker = new Locker {
def setLockable(lockable: LockableServiceSupport) {}
def configure(persistenceAdapter: PersistenceAdapter) {}
def setFailIfLocked(failIfLocked: Boolean) {}
def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {}

View File

@ -43,7 +43,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
verifyExpectedBroker(inflightMessageCount);
if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate();
final EmbeddedDataSource ds = ((SyncCreateDataSource)getExistingDataSource()).getDelegate();
ds.setShutdownDatabase("shutdown");
LOG.info("DB STOPPED!@!!!!");

View File

@ -41,7 +41,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
super.setUp();
}
@ -109,61 +109,4 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
return sharedDs;
}
// prevent concurrent calls from attempting to create the db at the same time
// can result in "already exists in this jvm" errors
class SyncDataSource implements DataSource {
final EmbeddedDataSource delegate;
SyncDataSource(EmbeddedDataSource dataSource) {
this.delegate = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return null;
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
EmbeddedDataSource getDelegate() {
return delegate;
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
};
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
import javax.sql.DataSource;
import org.apache.derby.jdbc.EmbeddedDataSource;
// prevent concurrent calls from attempting to create the db at the same time
// can result in "already exists in this jvm" errors
public class SyncCreateDataSource implements DataSource {
final EmbeddedDataSource delegate;
SyncCreateDataSource(EmbeddedDataSource dataSource) {
this.delegate = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
synchronized (this) {
return delegate.getConnection();
}
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return null;
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
EmbeddedDataSource getDelegate() {
return delegate;
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.net.URI;
import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
super.setUp();
}
protected void createMaster() throws Exception {
master = new BrokerService();
master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setCreateTablesOnStartup(true);
leaseDatabaseLocker.setDataSource(getExistingDataSource());
leaseDatabaseLocker.setStatements(new Statements());
configureLocker(kahaDBPersistenceAdapter);
kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
configureBroker(master);
master.start();
}
protected void configureBroker(BrokerService brokerService) {
DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
// we want any store io exception to stop the broker
stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
}
protected void createSlave() throws Exception {
// use a separate thread as the slave will block waiting for
// the exclusive db lock
Thread t = new Thread() {
public void run() {
try {
BrokerService broker = new BrokerService();
broker.setBrokerName("slave");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(SLAVE_URL));
broker.addConnector(connector);
broker.setUseJmx(false);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setDataSource(getExistingDataSource());
leaseDatabaseLocker.setStatements(new Statements());
configureLocker(kahaDBPersistenceAdapter);
kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
configureBroker(broker);
broker.start();
slave.set(broker);
slaveStarted.countDown();
} catch (IllegalStateException expectedOnShutdown) {
} catch (Exception e) {
fail("failed to start slave broker, reason:" + e);
}
}
};
t.start();
}
protected void configureLocker(KahaDBPersistenceAdapter kahaDBPersistenceAdapter) throws IOException {
kahaDBPersistenceAdapter.setLockKeepAlivePeriod(500);
kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
}
protected DataSource getExistingDataSource() throws Exception {
return sharedDs;
}
}