mirror of https://github.com/apache/activemq.git
have KahaDB lock work in vm, so master slave tests can work ok. preserver kaha behaiour in this regard as it makes testing simpler. fix npe on shutdown if start fails
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
94261594d3
commit
e85dda9c64
|
@ -205,7 +205,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
|
||||||
public void doStop(ServiceStopper stopper) throws Exception {
|
public void doStop(ServiceStopper stopper) throws Exception {
|
||||||
//drain down async jobs
|
//drain down async jobs
|
||||||
LOG.info("Stopping async queue tasks");
|
LOG.info("Stopping async queue tasks");
|
||||||
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
|
if (this.globalQueueSemaphore != null) {
|
||||||
|
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
synchronized (this.asyncQueueMap) {
|
synchronized (this.asyncQueueMap) {
|
||||||
for (StoreQueueTask task : this.asyncQueueMap.values()) {
|
for (StoreQueueTask task : this.asyncQueueMap.values()) {
|
||||||
task.cancel();
|
task.cancel();
|
||||||
|
@ -213,7 +215,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
|
||||||
this.asyncQueueMap.clear();
|
this.asyncQueueMap.clear();
|
||||||
}
|
}
|
||||||
LOG.info("Stopping async topic tasks");
|
LOG.info("Stopping async topic tasks");
|
||||||
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
|
if (this.globalTopicSemaphore != null) {
|
||||||
|
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
synchronized (this.asyncTopicMap) {
|
synchronized (this.asyncTopicMap) {
|
||||||
for (StoreTopicTask task : this.asyncTopicMap.values()) {
|
for (StoreTopicTask task : this.asyncTopicMap.values()) {
|
||||||
task.cancel();
|
task.cancel();
|
||||||
|
|
|
@ -49,9 +49,12 @@ import junit.framework.TestCase;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
public class FailoverManagedClusterTest extends TestCase {
|
public class FailoverManagedClusterTest extends TestCase {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FailoverManagedClusterTest.class);
|
||||||
|
|
||||||
long txGenerator = System.currentTimeMillis();
|
long txGenerator = System.currentTimeMillis();
|
||||||
|
|
||||||
private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
|
private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
|
||||||
|
@ -61,12 +64,25 @@ public class FailoverManagedClusterTest extends TestCase {
|
||||||
|
|
||||||
private BrokerService master;
|
private BrokerService master;
|
||||||
private BrokerService slave;
|
private BrokerService slave;
|
||||||
|
private CountDownLatch slaveThreadStarted = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
createAndStartMaster();
|
createAndStartMaster();
|
||||||
createAndStartSlave();
|
createAndStartSlave();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (slave != null) {
|
||||||
|
slave.stop();
|
||||||
|
}
|
||||||
|
if (master != null) {
|
||||||
|
master.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void createAndStartMaster() throws Exception {
|
private void createAndStartMaster() throws Exception {
|
||||||
master = new BrokerService();
|
master = new BrokerService();
|
||||||
|
@ -88,8 +104,9 @@ public class FailoverManagedClusterTest extends TestCase {
|
||||||
new Thread(new Runnable() {
|
new Thread(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
slaveThreadStarted.countDown();
|
||||||
slave.start();
|
slave.start();
|
||||||
System.out.println("slave has started");
|
LOG.info("slave has started");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -112,7 +129,7 @@ public class FailoverManagedClusterTest extends TestCase {
|
||||||
|
|
||||||
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
|
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
System.out.println("Received message " + message);
|
LOG.info("Received message " + message);
|
||||||
super.onMessage(message);
|
super.onMessage(message);
|
||||||
messageDelivered.countDown();
|
messageDelivered.countDown();
|
||||||
};
|
};
|
||||||
|
@ -144,18 +161,14 @@ public class FailoverManagedClusterTest extends TestCase {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the broker a message to that endpoint
|
|
||||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
|
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
|
||||||
|
slaveThreadStarted.await(10, TimeUnit.SECONDS);
|
||||||
// force a failover
|
|
||||||
|
// force a failover before send
|
||||||
|
LOG.info("Stopping master to force failover..");
|
||||||
master.stop();
|
master.stop();
|
||||||
slave.waitUntilStarted();
|
master = null;
|
||||||
|
assertTrue("slave started ok", slave.waitUntilStarted());
|
||||||
try {
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
producer.send(session.createTextMessage("Hello, again!"));
|
producer.send(session.createTextMessage("Hello, again!"));
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.channels.FileLock;
|
import java.nio.channels.FileLock;
|
||||||
import java.nio.channels.OverlappingFileLockException;
|
import java.nio.channels.OverlappingFileLockException;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to lock a File.
|
* Used to lock a File.
|
||||||
|
@ -55,7 +56,9 @@ public class LockFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
IOHelper.mkdirs(file.getParentFile());
|
IOHelper.mkdirs(file.getParentFile());
|
||||||
|
if (System.getProperty(getVmLockKey()) != null) {
|
||||||
|
throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm.");
|
||||||
|
}
|
||||||
if (lock == null) {
|
if (lock == null) {
|
||||||
readFile = new RandomAccessFile(file, "rw");
|
readFile = new RandomAccessFile(file, "rw");
|
||||||
IOException reason = null;
|
IOException reason = null;
|
||||||
|
@ -66,6 +69,7 @@ public class LockFile {
|
||||||
}
|
}
|
||||||
if (lock != null) {
|
if (lock != null) {
|
||||||
lockCounter++;
|
lockCounter++;
|
||||||
|
System.setProperty(getVmLockKey(), new Date().toString());
|
||||||
} else {
|
} else {
|
||||||
// new read file for next attempt
|
// new read file for next attempt
|
||||||
closeReadFile();
|
closeReadFile();
|
||||||
|
@ -94,6 +98,7 @@ public class LockFile {
|
||||||
if (lock != null) {
|
if (lock != null) {
|
||||||
try {
|
try {
|
||||||
lock.release();
|
lock.release();
|
||||||
|
System.getProperties().remove(getVmLockKey());
|
||||||
} catch (Throwable ignore) {
|
} catch (Throwable ignore) {
|
||||||
}
|
}
|
||||||
lock = null;
|
lock = null;
|
||||||
|
@ -105,6 +110,10 @@ public class LockFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getVmLockKey() throws IOException {
|
||||||
|
return getClass().getName() + ".lock." + file.getCanonicalPath();
|
||||||
|
}
|
||||||
|
|
||||||
private void closeReadFile() {
|
private void closeReadFile() {
|
||||||
// close the file.
|
// close the file.
|
||||||
if (readFile != null) {
|
if (readFile != null) {
|
||||||
|
|
Loading…
Reference in New Issue