diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java index a5e1995358..bda5e936cc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java @@ -31,7 +31,7 @@ public abstract class AbstractLocker extends ServiceSupport implements Locker { @Override public boolean keepAlive() throws IOException { - return false; + return true; } @Override diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java b/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java new file mode 100644 index 0000000000..fce0b14c12 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.IOException; + +/** + * A lockable broker resource. Uses {@link Locker} to guarantee that only single instance is running + * + */ +public interface Lockable { + + /** + * Turn locking on/off on the resource + * + * @param useLock + */ + public void setUseLock(boolean useLock); + + /** + * Create a default locker + * + * @return default locker + * @throws IOException + */ + public Locker createDefaultLocker() throws IOException; + + /** + * Set locker to be used + * + * @param locker + * @throws IOException + */ + public void setLocker(Locker locker) throws IOException; + + /** + * Period (in milliseconds) on which {@link org.apache.activemq.broker.Locker#keepAlive()} should be checked + * + * @param lockKeepAlivePeriod + */ + public void setLockKeepAlivePeriod(long lockKeepAlivePeriod); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java new file mode 100644 index 0000000000..7ff448d28c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.ThreadPoolUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Helper class for working with services that requires locking + */ +public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware { + + private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class); + boolean useLock = true; + Locker locker; + long lockKeepAlivePeriod = 0; + private ScheduledFuture keepAliveTicket; + private ScheduledThreadPoolExecutor clockDaemon; + private BrokerService brokerService; + + /** + * Initialize resources before locking + * + * @throws Exception + */ + abstract public void init() throws Exception; + + @Override + public void setUseLock(boolean useLock) { + this.useLock = useLock; + } + + @Override + public void setLocker(Locker locker) throws IOException { + this.locker = locker; + if (this instanceof PersistenceAdapter) { + this.locker.configure((PersistenceAdapter)this); + } + } + + public Locker getLocker() throws IOException { + if (this.locker == null) { + this.locker = createDefaultLocker(); + } + return this.locker; + } + + @Override + public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) { + this.lockKeepAlivePeriod = lockKeepAlivePeriod; + } + + @Override + public void preStart() throws Exception { + init(); + if (useLock) { + if (getLocker() == null) { + LOG.warn("No locker configured"); + } else { + getLocker().start(); + if (lockKeepAlivePeriod > 0) { + keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() { + public void run() { + keepLockAlive(); + } + }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); + } + if (brokerService != null) { + brokerService.getBroker().nowMasterBroker(); + } + } + } + } + + @Override + public void postStop(ServiceStopper stopper) throws Exception { + if (useLock) { + if (keepAliveTicket != null) { + keepAliveTicket.cancel(false); + keepAliveTicket = null; + } + if (locker != null) { + getLocker().stop(); + } + ThreadPoolUtils.shutdown(clockDaemon); + } + } + + protected void keepLockAlive() { + boolean stop = false; + try { + Locker locker = getLocker(); + if (locker != null) { + if (!locker.keepAlive()) { + stop = true; + } + } + } catch (IOException e) { + LOG.warn("locker keepalive resulted in: " + e, e); + } + if (stop) { + stopBroker(); + } + } + + protected void stopBroker() { + // we can no longer keep the lock so lets fail + LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master"); + try { + brokerService.stop(); + } catch (Exception e) { + LOG.warn("Failure occurred while stopping broker"); + } + } + + public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { + if (clockDaemon == null) { + clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer"); + thread.setDaemon(true); + return thread; + } + }); + } + return clockDaemon; + } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java similarity index 85% rename from activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java rename to activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java index c6546f8980..00b249082b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java @@ -21,6 +21,7 @@ import java.io.IOException; import javax.sql.DataSource; +import org.apache.activemq.broker.LockableServiceSupport; import org.apache.activemq.util.IOHelper; import org.apache.derby.jdbc.EmbeddedDataSource; @@ -30,16 +31,16 @@ import org.apache.derby.jdbc.EmbeddedDataSource; * * */ -public class DataSourceSupport { +abstract public class DataSourceServiceSupport extends LockableServiceSupport { private String dataDirectory = IOHelper.getDefaultDataDirectory(); private File dataDirectoryFile; private DataSource dataSource; - public DataSourceSupport() { + public DataSourceServiceSupport() { } - public DataSourceSupport(DataSource dataSource) { + public DataSourceServiceSupport(DataSource dataSource) { this.dataSource = dataSource; } @@ -64,7 +65,7 @@ public class DataSourceSupport { public DataSource getDataSource() throws IOException { if (dataSource == null) { - dataSource = createDataSource(); + dataSource = createDataSource(getDataDirectoryFile().getCanonicalPath()); if (dataSource == null) { throw new IllegalArgumentException("No dataSource property has been configured"); } @@ -76,10 +77,10 @@ public class DataSourceSupport { this.dataSource = dataSource; } - protected DataSource createDataSource() throws IOException { + public static DataSource createDataSource(String homeDir) throws IOException { // Setup the Derby datasource. - System.setProperty("derby.system.home", getDataDirectoryFile().getCanonicalPath()); + System.setProperty("derby.system.home", homeDir); System.setProperty("derby.storage.fileSyncTransactionLog", "true"); System.setProperty("derby.storage.pageCacheSize", "100"); @@ -93,4 +94,6 @@ public class DataSourceSupport { return "" + dataSource; } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index b2d9c26ae0..476573dba9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -54,6 +54,7 @@ import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory; * * */ -public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, +public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); @@ -79,19 +80,17 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist private static FactoryFinder lockFactoryFinder = new FactoryFinder( "META-INF/services/org/apache/activemq/store/jdbc/lock/"); + public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000; + private WireFormat wireFormat = new OpenWireFormat(); private BrokerService brokerService; private Statements statements; private JDBCAdapter adapter; private MemoryTransactionStore transactionStore; private ScheduledThreadPoolExecutor clockDaemon; - private ScheduledFuture cleanupTicket, keepAliveTicket; + private ScheduledFuture cleanupTicket; private int cleanupPeriod = 1000 * 60 * 5; private boolean useExternalMessageReferences; - private boolean useDatabaseLock = true; - private long lockKeepAlivePeriod = 1000*30; - private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; - private Locker locker; private boolean createTablesOnStartup = true; private DataSource lockDataSource; private int transactionIsolation; @@ -106,6 +105,10 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; + { + setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD); + } + public JDBCPersistenceAdapter() { } @@ -281,8 +284,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist } } - - public void start() throws Exception { + @Override + public void init() throws Exception { getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); if (isCreateTablesOnStartup()) { @@ -299,26 +302,9 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist transactionContext.commit(); } } + } - if (isUseDatabaseLock()) { - Locker service = getLocker(); - if (service == null) { - LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter"); - } else { - service.start(); - if (lockKeepAlivePeriod > 0) { - keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() { - public void run() { - databaseLockKeepAlive(); - } - }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); - } - if (brokerService != null) { - brokerService.getBroker().nowMasterBroker(); - } - } - } - + public void doStart() throws Exception { // Cleanup the db periodically. if (cleanupPeriod > 0) { cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { @@ -331,21 +317,11 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist createMessageAudit(); } - public synchronized void stop() throws Exception { + public synchronized void doStop(ServiceStopper stopper) throws Exception { if (cleanupTicket != null) { cleanupTicket.cancel(true); cleanupTicket = null; } - if (keepAliveTicket != null) { - keepAliveTicket.cancel(false); - keepAliveTicket = null; - } - - // do not shutdown clockDaemon as it may kill the thread initiating shutdown - Locker service = getDatabaseLocker(); - if (service != null) { - service.stop(); - } } public void cleanup() { @@ -403,13 +379,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist return getLocker(); } - public Locker getLocker() throws IOException { - if (locker == null && isUseDatabaseLock()) { - setLocker(loadDataBaseLocker()); - } - return locker; - } - /** * Sets the database locker strategy to use to lock the database on startup * @throws IOException @@ -420,16 +389,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist setLocker(locker); } - /** - * Sets the database locker strategy to use to lock the database on startup - * @throws IOException - */ - public void setLocker(Locker locker) throws IOException { - this.locker = locker; - locker.configure(this); - locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval()); - } - public DataSource getLockDataSource() throws IOException { if (lockDataSource == null) { lockDataSource = getDataSource(); @@ -595,16 +554,15 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist this.createTablesOnStartup = createTablesOnStartup; } - public boolean isUseDatabaseLock() { - return useDatabaseLock; - } - /** + * @deprecated use {@link #setUseLock(boolean)} instead + * * Sets whether or not an exclusive database lock should be used to enable * JDBC Master/Slave. Enabled by default. */ + @Deprecated public void setUseDatabaseLock(boolean useDatabaseLock) { - this.useDatabaseLock = useDatabaseLock; + setUseLock(useDatabaseLock); } public static void log(String msg, SQLException e) { @@ -634,39 +592,13 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist public void setUsageManager(SystemUsage usageManager) { } - protected void databaseLockKeepAlive() { - boolean stop = false; - try { - Locker locker = getDatabaseLocker(); - if (locker != null) { - if (!locker.keepAlive()) { - stop = true; - } - } - } catch (IOException e) { - LOG.warn("databaselocker keepalive resulted in: " + e, e); - } - if (stop) { - stopBroker(); - } - } - - protected void stopBroker() { - // we can no longer keep the lock so lets fail - LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master"); - try { - brokerService.stop(); - } catch (Exception e) { - LOG.warn("Failure occurred while stopping broker"); - } - } - - protected Locker loadDataBaseLocker() throws IOException { + public Locker createDefaultLocker() throws IOException { DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock"); if (locker == null) { locker = new DefaultDatabaseLocker(); LOG.debug("Using default JDBC Locker: " + locker); } + locker.configure(this); return locker; } @@ -711,24 +643,15 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist return 0; } - public long getLockKeepAlivePeriod() { - return lockKeepAlivePeriod; - } - - public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) { - this.lockKeepAlivePeriod = lockKeepAlivePeriod; - } - - public long getLockAcquireSleepInterval() { - return lockAcquireSleepInterval; - } - /** + * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead + * * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker * not applied if DataBaseLocker is injected. + * */ - public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { - this.lockAcquireSleepInterval = lockAcquireSleepInterval; + public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException { + getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java index 36ebe5d725..865e29ce1e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java @@ -22,13 +22,15 @@ import java.io.IOException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.active.JournalImpl; import org.apache.activeio.journal.active.JournalLockedException; +import org.apache.activemq.broker.Locker; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.jdbc.DataSourceSupport; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.Statements; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory; * @org.apache.xbean.XBean * */ -public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { +public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory { private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; @@ -185,16 +187,12 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen jdbcPersistenceAdapter.setStatements(statements); } - public boolean isUseDatabaseLock() { - return jdbcPersistenceAdapter.isUseDatabaseLock(); - } - /** * Sets whether or not an exclusive database lock should be used to enable * JDBC Master/Slave. Enabled by default. */ public void setUseDatabaseLock(boolean useDatabaseLock) { - jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); + jdbcPersistenceAdapter.setUseLock(useDatabaseLock); } public boolean isCreateTablesOnStartup() { @@ -245,4 +243,18 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen } } + @Override + public Locker createDefaultLocker() throws IOException { + return null; + } + + @Override + public void init() throws Exception { + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception {} + + @Override + protected void doStart() throws Exception {} } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 3462703149..564a290b89 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -17,8 +17,8 @@ package org.apache.activemq.store.kahadb; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.LockableServiceSupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -37,6 +37,7 @@ import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.ServiceStopper; import java.io.File; import java.io.IOException; @@ -49,9 +50,8 @@ import java.util.Set; * @org.apache.xbean.XBean element="kahaDB" * */ -public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { +public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter { private final KahaDBStore letter = new KahaDBStore(); - private Locker locker; /** * @param context @@ -191,8 +191,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi * @throws Exception * @see org.apache.activemq.Service#start() */ - public void start() throws Exception { - getLocker().start(); + public void doStart() throws Exception { this.letter.start(); } @@ -200,12 +199,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi * @throws Exception * @see org.apache.activemq.Service#stop() */ - public void stop() throws Exception { - try { - this.letter.stop(); - } finally { - getLocker().stop(); - } + public void doStop(ServiceStopper stopper) throws Exception { + this.letter.stop(); } /** @@ -486,17 +481,13 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @return the databaseLockedWaitDelay - */ - public int getDatabaseLockedWaitDelay() { - return letter.getDatabaseLockedWaitDelay(); - } - - /** + * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead + * * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set */ - public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { - letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); + @Deprecated + public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { + getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); } public boolean getForceRecoverIndex() { @@ -594,24 +585,15 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi return rc; } - public void setLocker(Locker locker) { - this.locker = locker; - } - - protected Locker getLocker() throws IOException { - if (this.locker == null) { - this.locker = createDefaultLocker(); - } - return this.locker; - } - - protected Locker createDefaultLocker() throws IOException { + public Locker createDefaultLocker() throws IOException { SharedFileLocker locker = new SharedFileLocker(); locker.configure(this); - locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay()); return locker; } + @Override + public void init() throws Exception {} + @Override public String toString() { String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index a3a6823f82..4db19151c2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -221,7 +221,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private int indexCacheSize = 10000; private boolean checkForCorruptJournalFiles = false; private boolean checksumJournalFiles = false; - private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; protected boolean forceRecoverIndex = false; private final Object checkpointThreadLock = new Object(); private boolean rewriteOnRedelivery = false; @@ -2308,20 +2307,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.directoryArchive = directoryArchive; } - /** - * @return the databaseLockedWaitDelay - */ - public int getDatabaseLockedWaitDelay() { - return this.databaseLockedWaitDelay; - } - - /** - * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set - */ - public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { - this.databaseLockedWaitDelay = databaseLockedWaitDelay; - } - public boolean isRewriteOnRedelivery() { return rewriteOnRedelivery; } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java index 821810fb85..bcffb910b7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java @@ -51,6 +51,7 @@ public abstract class ServiceSupport implements Service { boolean success = false; stopped.set(false); try { + preStart(); doStart(); success = true; } finally { @@ -70,6 +71,8 @@ public abstract class ServiceSupport implements Service { doStop(stopper); } catch (Exception e) { stopper.onException(this, e); + } finally { + postStop(stopper); } stopped.set(true); started.set(false); @@ -110,7 +113,23 @@ public abstract class ServiceSupport implements Service { this.serviceListeners.remove(l); } + /** + * + * handle for various operations after stopping the service (like locking) + * + * @throws Exception + */ + protected void postStop(ServiceStopper stopper) throws Exception {} + protected abstract void doStop(ServiceStopper stopper) throws Exception; + /** + * + * handle for various operations before starting the service (like locking) + * + * @throws Exception + */ + protected void preStart() throws Exception {} + protected abstract void doStart() throws Exception; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java index e13d93a9ed..7a543e9e80 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java @@ -17,14 +17,11 @@ package org.apache.activemq.broker.ft; import java.io.IOException; -import java.sql.Connection; import java.util.concurrent.TimeUnit; -import junit.framework.Test; + import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.store.jdbc.DataSourceSupport; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; -import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +31,7 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa @Override protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException { super.configureJdbcPersistenceAdapter(persistenceAdapter); - persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval()); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval()); persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod()); persistenceAdapter.setLocker(new LeaseDatabaseLocker()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java index 3c284b3a7c..41dcc42367 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java @@ -28,9 +28,10 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.jdbc.DataSourceSupport; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOHelper; import org.apache.derby.jdbc.EmbeddedDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnection topic = false; verbose = true; // startup db - sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource(); + sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); broker = new BrokerService(); @@ -65,9 +66,9 @@ public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnection broker.setDeleteAllMessagesOnStartup(true); JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); persistenceAdapter.setDataSource(sharedDs); - persistenceAdapter.setUseDatabaseLock(false); + persistenceAdapter.setUseLock(false); persistenceAdapter.setLockKeepAlivePeriod(500); - persistenceAdapter.setLockAcquireSleepInterval(500); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(500); broker.setPersistenceAdapter(persistenceAdapter); broker.start(); super.setUp(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java index f3536676d3..cef95aa53b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java @@ -28,8 +28,9 @@ import javax.sql.DataSource; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.store.jdbc.DataSourceSupport; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.IOHelper; import org.apache.derby.jdbc.EmbeddedDataSource; public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest { @@ -39,7 +40,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest { protected void setUp() throws Exception { // startup db - sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource()); + sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); super.setUp(); } @@ -96,7 +97,7 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest { protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException { persistenceAdapter.setLockKeepAlivePeriod(500); - persistenceAdapter.setLockAcquireSleepInterval(500); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(500); } protected DataSource getExistingDataSource() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java index 148b74f277..338c1417dd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java @@ -347,7 +347,7 @@ public class FailoverStaticNetworkTest { brokerA.setBrokerName("Pair"); brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker")); - ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000); + ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000); brokerA.start(); brokerA.waitUntilStopped(); @@ -378,7 +378,7 @@ public class FailoverStaticNetworkTest { // so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker")); - ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000); + ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000); brokerA1.start(); brokerA1.waitUntilStopped(); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java index 46e1b7368b..4c562091c4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java @@ -164,7 +164,7 @@ public class JDBCCommitExceptionTest extends TestCase { dataSource.setCreateDatabase("create"); jdbc.setDataSource(dataSource); - jdbc.setUseDatabaseLock(false); + jdbc.setUseLock(false); jdbc.deleteAllMessages(); broker.setPersistenceAdapter(jdbc);