Fix for the failing QueueMasterSlaveTestUsingSharedFileTest on windows. The lock on the AsyncDataManager was not being retried.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@636037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-11 17:59:24 +00:00
parent 75d43c027f
commit 67d38fafd1
4 changed files with 55 additions and 23 deletions

View File

@ -113,11 +113,7 @@ public class AsyncDataManager {
} }
started = true; started = true;
directory.mkdirs(); lock();
synchronized (this) {
controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
controlFile.lock();
}
ByteSequence sequence = controlFile.load(); ByteSequence sequence = controlFile.load();
if (sequence != null && sequence.getLength() > 0) { if (sequence != null && sequence.getLength() > 0) {
@ -194,6 +190,16 @@ public class AsyncDataManager {
Scheduler.executePeriodically(cleanupTask, 1000 * 30); Scheduler.executePeriodically(cleanupTask, 1000 * 30);
} }
public void lock() throws IOException {
synchronized (this) {
if( controlFile == null ) {
directory.mkdirs();
controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
}
controlFile.lock();
}
}
protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
if (location == null) { if (location == null) {
location = new Location(); location = new Location();

View File

@ -30,9 +30,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalLockedException;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -90,6 +90,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
private static final boolean BROKEN_FILE_LOCK; private static final boolean BROKEN_FILE_LOCK;
private static final boolean DISABLE_LOCKING; private static final boolean DISABLE_LOCKING;
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
private AsyncDataManager asyncDataManager; private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter; private ReferenceStoreAdapter referenceStoreAdapter;
@ -125,6 +126,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private RandomAccessFile lockFile; private RandomAccessFile lockFile;
private FileLock lock; private FileLock lock;
private boolean disableLocking = DISABLE_LOCKING; private boolean disableLocking = DISABLE_LOCKING;
private boolean failIfJournalIsLocked;
public String getBrokerName() { public String getBrokerName() {
return this.brokerName; return this.brokerName;
@ -186,6 +188,24 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
taskRunnerFactory = createTaskRunnerFactory(); taskRunnerFactory = createTaskRunnerFactory();
} }
if (failIfJournalIsLocked) {
asyncDataManager.lock();
} else {
while (true) {
try {
asyncDataManager.lock();
break;
} catch (IOException e) {
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
try {
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
asyncDataManager.start(); asyncDataManager.start();
if (deleteAllMessages) { if (deleteAllMessages) {
asyncDataManager.delete(); asyncDataManager.delete();

View File

@ -17,6 +17,10 @@
package org.apache.activemq.broker.ft; package org.apache.activemq.broker.ft;
import java.io.File; import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -32,7 +36,8 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
private static final transient Log LOG = LogFactory.getLog(QueueMasterSlaveTest.class); private static final transient Log LOG = LogFactory.getLog(QueueMasterSlaveTest.class);
protected BrokerService master; protected BrokerService master;
protected BrokerService slave; protected AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>();
protected CountDownLatch slaveStarted = new CountDownLatch(1);
protected int inflightMessageCount; protected int inflightMessageCount;
protected int failureCount = 50; protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
@ -62,7 +67,12 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
slave.stop();
slaveStarted.await(5, TimeUnit.SECONDS);
BrokerService brokerService = slave.get();
if( brokerService!=null ) {
brokerService.stop();
}
master.stop(); master.stop();
} }
@ -84,16 +94,18 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
} }
protected void createMaster() throws Exception { protected void createMaster() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml())); BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
brokerFactory.afterPropertiesSet(); brokerFactory.afterPropertiesSet();
master = brokerFactory.getBroker(); master = brokerFactory.getBroker();
master.start(); master.start();
} }
protected void createSlave() throws Exception { protected void createSlave() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml())); BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
brokerFactory.afterPropertiesSet(); brokerFactory.afterPropertiesSet();
slave = brokerFactory.getBroker(); BrokerService broker = brokerFactory.getBroker();
slave.start(); broker.start();
slave.set(broker);
slaveStarted.countDown();
} }
} }

View File

@ -16,10 +16,6 @@
*/ */
package org.apache.activemq.broker.ft; package org.apache.activemq.broker.ft;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
public class QueueMasterSlaveTestUsingSharedFileTest extends public class QueueMasterSlaveTestUsingSharedFileTest extends
QueueMasterSlaveTest { QueueMasterSlaveTest {
@ -31,17 +27,15 @@ public class QueueMasterSlaveTestUsingSharedFileTest extends
return "org/apache/activemq/broker/ft/sharedFileMaster.xml"; return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
} }
protected void createSlave() throws Exception { protected void createSlave() throws Exception {
// Start the Brokers async since starting them up could be a blocking operation..
new Thread(new Runnable() { new Thread(new Runnable() {
public void run() { public void run() {
try { try {
QueueMasterSlaveTestUsingSharedFileTest.super.createSlave(); QueueMasterSlaveTestUsingSharedFileTest.super.createSlave();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
}).start(); }).start();