https://issues.apache.org/jira/browse/AMQ-4005 - introducing Lockable interface and LockableServiceSupport for easier dealing with locks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1383400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-09-11 13:17:39 +00:00
parent aaf03576ee
commit 170b86a861
14 changed files with 316 additions and 179 deletions

View File

@ -31,7 +31,7 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker {
@Override
public boolean keepAlive() throws IOException {
return false;
return true;
}
@Override

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.io.IOException;
/**
* A lockable broker resource. Uses {@link Locker} to guarantee that only single instance is running
*
*/
public interface Lockable {
/**
* Turn locking on/off on the resource
*
* @param useLock
*/
public void setUseLock(boolean useLock);
/**
* Create a default locker
*
* @return default locker
* @throws IOException
*/
public Locker createDefaultLocker() throws IOException;
/**
* Set locker to be used
*
* @param locker
* @throws IOException
*/
public void setLocker(Locker locker) throws IOException;
/**
* Period (in milliseconds) on which {@link org.apache.activemq.broker.Locker#keepAlive()} should be checked
*
* @param lockKeepAlivePeriod
*/
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Helper class for working with services that requires locking
*/
public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
boolean useLock = true;
Locker locker;
long lockKeepAlivePeriod = 0;
private ScheduledFuture<?> keepAliveTicket;
private ScheduledThreadPoolExecutor clockDaemon;
private BrokerService brokerService;
/**
* Initialize resources before locking
*
* @throws Exception
*/
abstract public void init() throws Exception;
@Override
public void setUseLock(boolean useLock) {
this.useLock = useLock;
}
@Override
public void setLocker(Locker locker) throws IOException {
this.locker = locker;
if (this instanceof PersistenceAdapter) {
this.locker.configure((PersistenceAdapter)this);
}
}
public Locker getLocker() throws IOException {
if (this.locker == null) {
this.locker = createDefaultLocker();
}
return this.locker;
}
@Override
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
this.lockKeepAlivePeriod = lockKeepAlivePeriod;
}
@Override
public void preStart() throws Exception {
init();
if (useLock) {
if (getLocker() == null) {
LOG.warn("No locker configured");
} else {
getLocker().start();
if (lockKeepAlivePeriod > 0) {
keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
public void run() {
keepLockAlive();
}
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
}
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
}
}
@Override
public void postStop(ServiceStopper stopper) throws Exception {
if (useLock) {
if (keepAliveTicket != null) {
keepAliveTicket.cancel(false);
keepAliveTicket = null;
}
if (locker != null) {
getLocker().stop();
}
ThreadPoolUtils.shutdown(clockDaemon);
}
}
protected void keepLockAlive() {
boolean stop = false;
try {
Locker locker = getLocker();
if (locker != null) {
if (!locker.keepAlive()) {
stop = true;
}
}
} catch (IOException e) {
LOG.warn("locker keepalive resulted in: " + e, e);
}
if (stop) {
stopBroker();
}
}
protected void stopBroker() {
// we can no longer keep the lock so lets fail
LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker");
}
}
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
if (clockDaemon == null) {
clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
thread.setDaemon(true);
return thread;
}
});
}
return clockDaemon;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import javax.sql.DataSource;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
@ -30,16 +31,16 @@ import org.apache.derby.jdbc.EmbeddedDataSource;
*
*
*/
public class DataSourceSupport {
abstract public class DataSourceServiceSupport extends LockableServiceSupport {
private String dataDirectory = IOHelper.getDefaultDataDirectory();
private File dataDirectoryFile;
private DataSource dataSource;
public DataSourceSupport() {
public DataSourceServiceSupport() {
}
public DataSourceSupport(DataSource dataSource) {
public DataSourceServiceSupport(DataSource dataSource) {
this.dataSource = dataSource;
}
@ -64,7 +65,7 @@ public class DataSourceSupport {
public DataSource getDataSource() throws IOException {
if (dataSource == null) {
dataSource = createDataSource();
dataSource = createDataSource(getDataDirectoryFile().getCanonicalPath());
if (dataSource == null) {
throw new IllegalArgumentException("No dataSource property has been configured");
}
@ -76,10 +77,10 @@ public class DataSourceSupport {
this.dataSource = dataSource;
}
protected DataSource createDataSource() throws IOException {
public static DataSource createDataSource(String homeDir) throws IOException {
// Setup the Derby datasource.
System.setProperty("derby.system.home", getDataDirectoryFile().getCanonicalPath());
System.setProperty("derby.system.home", homeDir);
System.setProperty("derby.storage.fileSyncTransactionLog", "true");
System.setProperty("derby.storage.pageCacheSize", "100");
@ -93,4 +94,6 @@ public class DataSourceSupport {
return "" + dataSource;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
*
*
*/
public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter,
BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
@ -79,19 +80,17 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
private static FactoryFinder lockFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/lock/");
public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
private WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
private Statements statements;
private JDBCAdapter adapter;
private MemoryTransactionStore transactionStore;
private ScheduledThreadPoolExecutor clockDaemon;
private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
private ScheduledFuture<?> cleanupTicket;
private int cleanupPeriod = 1000 * 60 * 5;
private boolean useExternalMessageReferences;
private boolean useDatabaseLock = true;
private long lockKeepAlivePeriod = 1000*30;
private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
private Locker locker;
private boolean createTablesOnStartup = true;
private DataSource lockDataSource;
private int transactionIsolation;
@ -106,6 +105,10 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
{
setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
}
public JDBCPersistenceAdapter() {
}
@ -281,8 +284,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
}
public void start() throws Exception {
@Override
public void init() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
if (isCreateTablesOnStartup()) {
@ -299,26 +302,9 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
transactionContext.commit();
}
}
}
if (isUseDatabaseLock()) {
Locker service = getLocker();
if (service == null) {
LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
} else {
service.start();
if (lockKeepAlivePeriod > 0) {
keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
public void run() {
databaseLockKeepAlive();
}
}, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
}
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
}
}
public void doStart() throws Exception {
// Cleanup the db periodically.
if (cleanupPeriod > 0) {
cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
@ -331,21 +317,11 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
createMessageAudit();
}
public synchronized void stop() throws Exception {
public synchronized void doStop(ServiceStopper stopper) throws Exception {
if (cleanupTicket != null) {
cleanupTicket.cancel(true);
cleanupTicket = null;
}
if (keepAliveTicket != null) {
keepAliveTicket.cancel(false);
keepAliveTicket = null;
}
// do not shutdown clockDaemon as it may kill the thread initiating shutdown
Locker service = getDatabaseLocker();
if (service != null) {
service.stop();
}
}
public void cleanup() {
@ -403,13 +379,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
return getLocker();
}
public Locker getLocker() throws IOException {
if (locker == null && isUseDatabaseLock()) {
setLocker(loadDataBaseLocker());
}
return locker;
}
/**
* Sets the database locker strategy to use to lock the database on startup
* @throws IOException
@ -420,16 +389,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
setLocker(locker);
}
/**
* Sets the database locker strategy to use to lock the database on startup
* @throws IOException
*/
public void setLocker(Locker locker) throws IOException {
this.locker = locker;
locker.configure(this);
locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
}
public DataSource getLockDataSource() throws IOException {
if (lockDataSource == null) {
lockDataSource = getDataSource();
@ -595,16 +554,15 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
this.createTablesOnStartup = createTablesOnStartup;
}
public boolean isUseDatabaseLock() {
return useDatabaseLock;
}
/**
* @deprecated use {@link #setUseLock(boolean)} instead
*
* Sets whether or not an exclusive database lock should be used to enable
* JDBC Master/Slave. Enabled by default.
*/
@Deprecated
public void setUseDatabaseLock(boolean useDatabaseLock) {
this.useDatabaseLock = useDatabaseLock;
setUseLock(useDatabaseLock);
}
public static void log(String msg, SQLException e) {
@ -634,39 +592,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setUsageManager(SystemUsage usageManager) {
}
protected void databaseLockKeepAlive() {
boolean stop = false;
try {
Locker locker = getDatabaseLocker();
if (locker != null) {
if (!locker.keepAlive()) {
stop = true;
}
}
} catch (IOException e) {
LOG.warn("databaselocker keepalive resulted in: " + e, e);
}
if (stop) {
stopBroker();
}
}
protected void stopBroker() {
// we can no longer keep the lock so lets fail
LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker");
}
}
protected Locker loadDataBaseLocker() throws IOException {
public Locker createDefaultLocker() throws IOException {
DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
locker = new DefaultDatabaseLocker();
LOG.debug("Using default JDBC Locker: " + locker);
}
locker.configure(this);
return locker;
}
@ -711,24 +643,15 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
return 0;
}
public long getLockKeepAlivePeriod() {
return lockKeepAlivePeriod;
}
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
this.lockKeepAlivePeriod = lockKeepAlivePeriod;
}
public long getLockAcquireSleepInterval() {
return lockAcquireSleepInterval;
}
/**
* @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
*
* millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
* not applied if DataBaseLocker is injected.
*
*/
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
this.lockAcquireSleepInterval = lockAcquireSleepInterval;
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
}
/**

View File

@ -22,13 +22,15 @@ import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.JournalLockedException;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean
*
*/
public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
@ -185,16 +187,12 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen
jdbcPersistenceAdapter.setStatements(statements);
}
public boolean isUseDatabaseLock() {
return jdbcPersistenceAdapter.isUseDatabaseLock();
}
/**
* Sets whether or not an exclusive database lock should be used to enable
* JDBC Master/Slave. Enabled by default.
*/
public void setUseDatabaseLock(boolean useDatabaseLock) {
jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
}
public boolean isCreateTablesOnStartup() {
@ -245,4 +243,18 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen
}
}
@Override
public Locker createDefaultLocker() throws IOException {
return null;
}
@Override
public void init() throws Exception {
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {}
@Override
protected void doStart() throws Exception {}
}

View File

@ -17,8 +17,8 @@
package org.apache.activemq.store.kahadb;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -37,6 +37,7 @@ import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import java.io.File;
import java.io.IOException;
@ -49,9 +50,8 @@ import java.util.Set;
* @org.apache.xbean.XBean element="kahaDB"
*
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
private final KahaDBStore letter = new KahaDBStore();
private Locker locker;
/**
* @param context
@ -191,8 +191,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
* @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception {
getLocker().start();
public void doStart() throws Exception {
this.letter.start();
}
@ -200,12 +199,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
* @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
try {
this.letter.stop();
} finally {
getLocker().stop();
}
public void doStop(ServiceStopper stopper) throws Exception {
this.letter.stop();
}
/**
@ -486,17 +481,13 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
}
/**
* @return the databaseLockedWaitDelay
*/
public int getDatabaseLockedWaitDelay() {
return letter.getDatabaseLockedWaitDelay();
}
/**
* @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
*
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
@Deprecated
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
}
public boolean getForceRecoverIndex() {
@ -594,24 +585,15 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
return rc;
}
public void setLocker(Locker locker) {
this.locker = locker;
}
protected Locker getLocker() throws IOException {
if (this.locker == null) {
this.locker = createDefaultLocker();
}
return this.locker;
}
protected Locker createDefaultLocker() throws IOException {
public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
locker.configure(this);
locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay());
return locker;
}
@Override
public void init() throws Exception {}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";

View File

@ -221,7 +221,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean rewriteOnRedelivery = false;
@ -2308,20 +2307,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.directoryArchive = directoryArchive;
}
/**
* @return the databaseLockedWaitDelay
*/
public int getDatabaseLockedWaitDelay() {
return this.databaseLockedWaitDelay;
}
/**
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
this.databaseLockedWaitDelay = databaseLockedWaitDelay;
}
public boolean isRewriteOnRedelivery() {
return rewriteOnRedelivery;
}

View File

@ -51,6 +51,7 @@ public abstract class ServiceSupport implements Service {
boolean success = false;
stopped.set(false);
try {
preStart();
doStart();
success = true;
} finally {
@ -70,6 +71,8 @@ public abstract class ServiceSupport implements Service {
doStop(stopper);
} catch (Exception e) {
stopper.onException(this, e);
} finally {
postStop(stopper);
}
stopped.set(true);
started.set(false);
@ -110,7 +113,23 @@ public abstract class ServiceSupport implements Service {
this.serviceListeners.remove(l);
}
/**
*
* handle for various operations after stopping the service (like locking)
*
* @throws Exception
*/
protected void postStop(ServiceStopper stopper) throws Exception {}
protected abstract void doStop(ServiceStopper stopper) throws Exception;
/**
*
* handle for various operations before starting the service (like locking)
*
* @throws Exception
*/
protected void preStart() throws Exception {}
protected abstract void doStart() throws Exception;
}

View File

@ -17,14 +17,11 @@
package org.apache.activemq.broker.ft;
import java.io.IOException;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +31,7 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
@Override
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
super.configureJdbcPersistenceAdapter(persistenceAdapter);
persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval());
persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
persistenceAdapter.setLocker(new LeaseDatabaseLocker());
}

View File

@ -28,9 +28,10 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,7 +52,7 @@ public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnection
topic = false;
verbose = true;
// startup db
sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
broker = new BrokerService();
@ -65,9 +66,9 @@ public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnection
broker.setDeleteAllMessagesOnStartup(true);
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
persistenceAdapter.setDataSource(sharedDs);
persistenceAdapter.setUseDatabaseLock(false);
persistenceAdapter.setUseLock(false);
persistenceAdapter.setLockKeepAlivePeriod(500);
persistenceAdapter.setLockAcquireSleepInterval(500);
persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
broker.setPersistenceAdapter(persistenceAdapter);
broker.start();
super.setUp();

View File

@ -28,8 +28,9 @@ import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@ -39,7 +40,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
protected void setUp() throws Exception {
// startup db
sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
super.setUp();
}
@ -96,7 +97,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
persistenceAdapter.setLockKeepAlivePeriod(500);
persistenceAdapter.setLockAcquireSleepInterval(500);
persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
}
protected DataSource getExistingDataSource() throws Exception {

View File

@ -347,7 +347,7 @@ public class FailoverStaticNetworkTest {
brokerA.setBrokerName("Pair");
brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker"));
((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
brokerA.start();
brokerA.waitUntilStopped();
@ -378,7 +378,7 @@ public class FailoverStaticNetworkTest {
// so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store
brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker"));
((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
brokerA1.start();
brokerA1.waitUntilStopped();

View File

@ -164,7 +164,7 @@ public class JDBCCommitExceptionTest extends TestCase {
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.setUseDatabaseLock(false);
jdbc.setUseLock(false);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);