diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index f30bb986bc..70676a7400 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -205,7 +205,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ public void doStop(ServiceStopper stopper) throws Exception { //drain down async jobs LOG.info("Stopping async queue tasks"); - this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + if (this.globalQueueSemaphore != null) { + this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + } synchronized (this.asyncQueueMap) { for (StoreQueueTask task : this.asyncQueueMap.values()) { task.cancel(); @@ -213,7 +215,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ this.asyncQueueMap.clear(); } LOG.info("Stopping async topic tasks"); - this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + if (this.globalTopicSemaphore != null) { + this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + } synchronized (this.asyncTopicMap) { for (StoreTopicTask task : this.asyncTopicMap.values()) { task.cancel(); diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java index 5e6b434e29..e4beed7ec3 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java @@ -49,9 +49,12 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class FailoverManagedClusterTest extends TestCase { - + private static final Log LOG = LogFactory.getLog(FailoverManagedClusterTest.class); + long txGenerator = System.currentTimeMillis(); private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616"; @@ -61,12 +64,25 @@ public class FailoverManagedClusterTest extends TestCase { private BrokerService master; private BrokerService slave; + private CountDownLatch slaveThreadStarted = new CountDownLatch(1); + @Override protected void setUp() throws Exception { createAndStartMaster(); createAndStartSlave(); } + @Override + protected void tearDown() throws Exception { + if (slave != null) { + slave.stop(); + } + if (master != null) { + master.stop(); + } + } + + private void createAndStartMaster() throws Exception { master = new BrokerService(); @@ -88,8 +104,9 @@ public class FailoverManagedClusterTest extends TestCase { new Thread(new Runnable() { public void run() { try { + slaveThreadStarted.countDown(); slave.start(); - System.out.println("slave has started"); + LOG.info("slave has started"); } catch (Exception e) { e.printStackTrace(); } @@ -112,7 +129,7 @@ public class FailoverManagedClusterTest extends TestCase { final StubMessageEndpoint endpoint = new StubMessageEndpoint() { public void onMessage(Message message) { - System.out.println("Received message " + message); + LOG.info("Received message " + message); super.onMessage(message); messageDelivered.countDown(); }; @@ -144,18 +161,14 @@ public class FailoverManagedClusterTest extends TestCase { } catch (InterruptedException e) { } - // Send the broker a message to that endpoint MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); - - // force a failover + slaveThreadStarted.await(10, TimeUnit.SECONDS); + + // force a failover before send + LOG.info("Stopping master to force failover.."); master.stop(); - slave.waitUntilStarted(); - - try { - Thread.sleep(2000); - } catch (InterruptedException ie) { - // ignore - } + master = null; + assertTrue("slave started ok", slave.waitUntilStarted()); producer.send(session.createTextMessage("Hello, again!")); diff --git a/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java b/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java index 36848325d9..70a8f8b847 100644 --- a/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.util.Date; /** * Used to lock a File. @@ -55,7 +56,9 @@ public class LockFile { } IOHelper.mkdirs(file.getParentFile()); - + if (System.getProperty(getVmLockKey()) != null) { + throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm."); + } if (lock == null) { readFile = new RandomAccessFile(file, "rw"); IOException reason = null; @@ -66,6 +69,7 @@ public class LockFile { } if (lock != null) { lockCounter++; + System.setProperty(getVmLockKey(), new Date().toString()); } else { // new read file for next attempt closeReadFile(); @@ -94,6 +98,7 @@ public class LockFile { if (lock != null) { try { lock.release(); + System.getProperties().remove(getVmLockKey()); } catch (Throwable ignore) { } lock = null; @@ -105,6 +110,10 @@ public class LockFile { } } + private String getVmLockKey() throws IOException { + return getClass().getName() + ".lock." + file.getCanonicalPath(); + } + private void closeReadFile() { // close the file. if (readFile != null) {