From e85dda9c64c27a3449da0c44b5314aa8be06cdc9 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 18 Jun 2010 12:27:51 +0000 Subject: [PATCH] have KahaDB lock work in vm, so master slave tests can work ok. preserver kaha behaiour in this regard as it makes testing simpler. fix npe on shutdown if start fails git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955973 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/kahadb/KahaDBStore.java | 8 +++- .../ra/FailoverManagedClusterTest.java | 39 ++++++++++++------- .../java/org/apache/kahadb/util/LockFile.java | 11 +++++- 3 files changed, 42 insertions(+), 16 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index f30bb986bc..70676a7400 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -205,7 +205,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ public void doStop(ServiceStopper stopper) throws Exception { //drain down async jobs LOG.info("Stopping async queue tasks"); - this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + if (this.globalQueueSemaphore != null) { + this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + } synchronized (this.asyncQueueMap) { for (StoreQueueTask task : this.asyncQueueMap.values()) { task.cancel(); @@ -213,7 +215,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ this.asyncQueueMap.clear(); } LOG.info("Stopping async topic tasks"); - this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + if (this.globalTopicSemaphore != null) { + this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); + } synchronized (this.asyncTopicMap) { for (StoreTopicTask task : this.asyncTopicMap.values()) { task.cancel(); diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java index 5e6b434e29..e4beed7ec3 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java @@ -49,9 +49,12 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class FailoverManagedClusterTest extends TestCase { - + private static final Log LOG = LogFactory.getLog(FailoverManagedClusterTest.class); + long txGenerator = System.currentTimeMillis(); private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616"; @@ -61,12 +64,25 @@ public class FailoverManagedClusterTest extends TestCase { private BrokerService master; private BrokerService slave; + private CountDownLatch slaveThreadStarted = new CountDownLatch(1); + @Override protected void setUp() throws Exception { createAndStartMaster(); createAndStartSlave(); } + @Override + protected void tearDown() throws Exception { + if (slave != null) { + slave.stop(); + } + if (master != null) { + master.stop(); + } + } + + private void createAndStartMaster() throws Exception { master = new BrokerService(); @@ -88,8 +104,9 @@ public class FailoverManagedClusterTest extends TestCase { new Thread(new Runnable() { public void run() { try { + slaveThreadStarted.countDown(); slave.start(); - System.out.println("slave has started"); + LOG.info("slave has started"); } catch (Exception e) { e.printStackTrace(); } @@ -112,7 +129,7 @@ public class FailoverManagedClusterTest extends TestCase { final StubMessageEndpoint endpoint = new StubMessageEndpoint() { public void onMessage(Message message) { - System.out.println("Received message " + message); + LOG.info("Received message " + message); super.onMessage(message); messageDelivered.countDown(); }; @@ -144,18 +161,14 @@ public class FailoverManagedClusterTest extends TestCase { } catch (InterruptedException e) { } - // Send the broker a message to that endpoint MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); - - // force a failover + slaveThreadStarted.await(10, TimeUnit.SECONDS); + + // force a failover before send + LOG.info("Stopping master to force failover.."); master.stop(); - slave.waitUntilStarted(); - - try { - Thread.sleep(2000); - } catch (InterruptedException ie) { - // ignore - } + master = null; + assertTrue("slave started ok", slave.waitUntilStarted()); producer.send(session.createTextMessage("Hello, again!")); diff --git a/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java b/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java index 36848325d9..70a8f8b847 100644 --- a/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.util.Date; /** * Used to lock a File. @@ -55,7 +56,9 @@ public class LockFile { } IOHelper.mkdirs(file.getParentFile()); - + if (System.getProperty(getVmLockKey()) != null) { + throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm."); + } if (lock == null) { readFile = new RandomAccessFile(file, "rw"); IOException reason = null; @@ -66,6 +69,7 @@ public class LockFile { } if (lock != null) { lockCounter++; + System.setProperty(getVmLockKey(), new Date().toString()); } else { // new read file for next attempt closeReadFile(); @@ -94,6 +98,7 @@ public class LockFile { if (lock != null) { try { lock.release(); + System.getProperties().remove(getVmLockKey()); } catch (Throwable ignore) { } lock = null; @@ -105,6 +110,10 @@ public class LockFile { } } + private String getVmLockKey() throws IOException { + return getClass().getName() + ".lock." + file.getCanonicalPath(); + } + private void closeReadFile() { // close the file. if (readFile != null) {