git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@603762 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-12-12 22:53:59 +00:00
parent a5a6a2a5d1
commit 1c4ef90ff3
8 changed files with 191 additions and 17 deletions

View File

@ -272,8 +272,22 @@ public interface Store {
*/
long size();
/**
* @return true if persistent indexes are used by default
*/
public boolean isPersistentIndex();
/**
* Set a persistent index as the default if the parameter is true
* @param persistentIndex
*/
public void setPersistentIndex(boolean persistentIndex);
/**
* An explict call to initialize - this will also be called
* implicitly for any other operation on the store.
* @throws IOException
*/
public void initialize() throws IOException;
}

View File

@ -445,10 +445,10 @@ public class KahaStore implements Store {
}
if (!initialized) {
LOG.info("Kaha Store using data directory " + directory);
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock();
LOG.info("Kaha Store using data directory " + directory);
DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
IndexItem mapRoot = new IndexItem();
@ -486,6 +486,7 @@ public class KahaStore implements Store {
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock();
if (lock == null) {
initialized=false;
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by another application");
} else {
@ -493,6 +494,7 @@ public class KahaStore implements Store {
}
}
} else { // already locked
initialized=false;
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by this application.");
}
@ -501,7 +503,7 @@ public class KahaStore implements Store {
private synchronized void unlock() throws IOException {
if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
System.getProperties().remove(getPropertyKey());
System.clearProperty(getPropertyKey());
if (lock.isValid()) {
lock.release();
}
@ -510,7 +512,6 @@ public class KahaStore implements Store {
}
private String getPropertyKey() throws IOException {
// Is replaceAll() needed? Should test without it.
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.store.amq;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
@ -83,6 +85,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
private static final boolean BROKEN_FILE_LOCK;
private static final boolean DISABLE_LOCKING;
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
@ -112,7 +118,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
private String directoryPath = "";
private RandomAccessFile lockFile;
private FileLock lock;
private boolean disableLocking = DISABLE_LOCKING;
public String getBrokerName() {
return this.brokerName;
@ -141,13 +150,17 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if (brokerService != null) {
this.directory = brokerService.getBrokerDataDirectory();
} else {
this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
this.directory = new File(directory, "amqstore");
this.directoryPath=directory.getAbsolutePath();
}
}
if (this.directoryArchive == null) {
this.directoryArchive = new File(this.directory,"archive");
}
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock();
LOG.info("AMQStore starting using directory: " + directory);
this.directory.mkdirs();
if (archiveDataLogs) {
@ -240,6 +253,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if (!started.compareAndSet(true, false)) {
return;
}
if (lockFile != null) {
lockFile.close();
lockFile = null;
}
unlock();
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
Scheduler.cancel(periodicCheckpointTask);
@ -818,7 +836,15 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
}
public boolean isDisableLocking() {
return disableLocking;
}
public void setDisableLocking(boolean disableLocking) {
this.disableLocking = disableLocking;
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
@ -836,4 +862,72 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
set.remove(dataFileId);
}
}
protected void lock() throws IOException, InterruptedException {
boolean logged = false;
boolean aquiredLock = false;
do {
if (doLock()) {
aquiredLock = true;
} else {
if (!logged) {
LOG.warn("Waiting to Lock the Store " + getDirectory());
logged = true;
}
Thread.sleep(1000);
}
if (aquiredLock && logged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
}
} while (!aquiredLock && !disableLocking);
}
private synchronized void unlock() throws IOException {
if (!disableLocking && (null != directory) && (null != lock)) {
System.clearProperty(getPropertyKey());
if (lock.isValid()) {
lock.release();
}
lock = null;
}
}
protected boolean doLock() throws IOException {
boolean result = true;
if (!disableLocking && directory != null && lock == null) {
String key = getPropertyKey();
String property = System.getProperty(key);
if (null == property) {
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock();
if (lock == null) {
result = false;
} else {
System.setProperty(key, new Date().toString());
}
}
} else { // already locked
result = false;
}
}
return result;
}
private String getPropertyKey() throws IOException {
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}
static {
BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
}
}

View File

@ -56,6 +56,7 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
return recoverMessage(message);
} else {
LOG.error("Message id " + ref + " could not be recovered from the data store!");
Thread.dumpStack();
}
return false;
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
@ -53,7 +54,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
private static final Integer INDEX_VERSION = new Integer(3);
@ -87,7 +88,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
@Override
public synchronized void start() throws Exception {
super.start();
Store store = getStateStore();
Store store = getStateStore();
boolean empty = store.getMapContainerIds().isEmpty();
stateMap = store.getMapContainer("state", STORE_STATE);
stateMap.load();

View File

@ -43,7 +43,7 @@ public class BitArrayBin {
maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
list = new LinkedList<BitArray>();
for (int i = 0; i < maxNumberOfArrays; i++) {
list.add(new BitArray());
list.add(null);
}
}
@ -130,6 +130,10 @@ public class BitArrayBin {
bin = list.size() - 1;
}
answer = list.get(bin);
if (answer == null) {
answer = new BitArray();
list.set(bin, answer);
}
}
return answer;
}

View File

@ -38,20 +38,15 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
protected void setUp() throws Exception {
messageCount = 10000;
if (System.getProperty("basedir") == null) {
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
failureCount = super.messageCount / 2;
super.topic = isTopic();
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
brokerFactory.afterPropertiesSet();
master = brokerFactory.getBroker();
brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
brokerFactory.afterPropertiesSet();
slave = brokerFactory.getBroker();
master.start();
slave.start();
createMaster();
createSlave();
// wait for thing to connect
Thread.sleep(1000);
super.setUp();
@ -88,4 +83,18 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
protected boolean isTopic() {
return false;
}
protected void createMaster() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
brokerFactory.afterPropertiesSet();
master = brokerFactory.getBroker();
master.start();
}
protected void createSlave() throws Exception {
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
brokerFactory.afterPropertiesSet();
slave = brokerFactory.getBroker();
slave.start();
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.ft;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
public class QueueMasterSlaveTestUsingSharedFileTest extends
QueueMasterSlaveTest {
protected String getSlaveXml() {
return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
}
protected String getMasterXml() {
return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
}
protected void createSlave() throws Exception {
new Thread(new Runnable() {
public void run() {
try {
QueueMasterSlaveTestUsingSharedFileTest.super.createSlave();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}