git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1416959 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-12-04 14:24:07 +00:00
parent f5dff68d73
commit 7461c78a19
2 changed files with 180 additions and 124 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -50,7 +51,14 @@ import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.*;
import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
@ -75,6 +83,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="amqPersistenceAdapter"
*
*/
@Deprecated
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware, JournaledStore {
private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
@ -120,7 +129,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private RandomAccessFile lockFile;
private FileLock lock;
private boolean disableLocking = DISABLE_LOCKING;
private boolean failIfJournalIsLocked;
private boolean failIfJournalIsLocked;
private boolean lockLogged;
private boolean lockAquired;
private boolean recoverReferenceStore=true;
@ -132,6 +141,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return this.brokerName;
}
@Override
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
if (this.referenceStoreAdapter != null) {
@ -143,10 +153,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return brokerService;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
@ -234,6 +246,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
LOG.info("Active data files: " + files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
@Override
public boolean iterate() {
doCheckpoint();
return false;
@ -268,6 +281,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// Do a checkpoint periodically.
periodicCheckpointTask = new Runnable() {
@Override
public void run() {
checkpoint(false);
}
@ -275,6 +289,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
periodicCleanupTask = new Runnable() {
@Override
public void run() {
cleanup();
}
@ -290,6 +305,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
@Override
public void stop() throws Exception {
if (!started.compareAndSet(true, false)) {
@ -347,6 +363,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
*
* @param sync
*/
@Override
public void checkpoint(boolean sync) {
try {
if (asyncDataManager == null) {
@ -454,6 +471,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
}
@Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
@ -469,6 +487,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
}
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
AMQMessageStore store = queues.get(destination);
if (store == null) {
@ -484,6 +503,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
AMQTopicMessageStore store = topics.get(destinationName);
if (store == null) {
@ -504,6 +524,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
*
* @param destination
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
AMQMessageStore store= queues.remove(destination);
referenceStoreAdapter.removeQueueMessageStore(destination);
@ -514,37 +535,43 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
*
* @param destination
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
@Override
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
@Override
public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
@Override
public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
@Override
public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
@Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
public boolean isPersistentIndex() {
return persistentIndex;
}
return persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
/**
* @param location
@ -698,12 +725,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
try {
return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
} catch (IOException ioe) {
LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
brokerService.handleIOException(ioe);
throw ioe;
try {
return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
} catch (IOException ioe) {
LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
}
@ -713,6 +740,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return writeCommand(trace, sync);
}
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
@ -725,6 +753,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return transactionStore;
}
@Override
public synchronized void deleteAllMessages() throws IOException {
deleteAllMessages = true;
}
@ -796,6 +825,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return usageManager;
}
@Override
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
@ -813,10 +843,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
@Override
public synchronized File getDirectory() {
return directory;
}
@Override
public synchronized void setDirectory(File directory) {
this.directory = directory;
}
@ -836,31 +868,32 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.referenceStoreAdapter = referenceStoreAdapter;
}
@Override
public long size(){
return storeSize.get();
}
public boolean isUseNio() {
return useNio;
}
public boolean isUseNio() {
return useNio;
}
public void setUseNio(boolean useNio) {
this.useNio = useNio;
}
public void setUseNio(boolean useNio) {
this.useNio = useNio;
}
public int getMaxFileLength() {
return maxFileLength;
}
public int getMaxFileLength() {
return maxFileLength;
}
/**
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMaxFileLength(int maxFileLength) {
this.maxFileLength = maxFileLength;
}
public void setMaxFileLength(int maxFileLength) {
this.maxFileLength = maxFileLength;
}
public long getCleanupInterval() {
public long getCleanupInterval() {
return cleanupInterval;
}
@ -913,11 +946,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
public void setIndexLoadFactor(int factor){
this.indexLoadFactor=factor;
this.indexLoadFactor=factor;
}
public int getIndexLoadFactor(){
return this.indexLoadFactor;
return this.indexLoadFactor;
}
public int getMaxReferenceFileLength() {
@ -1007,21 +1040,21 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
if (map == null) {
map = new ConcurrentHashMap<Integer, AtomicInteger>();
dataFilesInProgress.put(store, map);
}
AtomicInteger count = map.get(dataFileId);
if (count == null) {
count = new AtomicInteger(0);
map.put(dataFileId, count);
}
count.incrementAndGet();
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
if (map == null) {
map = new ConcurrentHashMap<Integer, AtomicInteger>();
dataFilesInProgress.put(store, map);
}
AtomicInteger count = map.get(dataFileId);
if (count == null) {
count = new AtomicInteger(0);
map.put(dataFileId, count);
}
count.incrementAndGet();
}
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
if (map != null) {
AtomicInteger count = map.get(dataFileId);
@ -1038,7 +1071,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
protected void lock() throws Exception {
protected void lock() throws Exception {
lockLogged = false;
lockAquired = false;
do {
@ -1055,7 +1088,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
} while (!lockAquired && !disableLocking);
}
private synchronized void unlock() throws IOException {
private synchronized void unlock() throws IOException {
if (!disableLocking && (null != lock)) {
//clear property doesn't work on some platforms
System.getProperties().remove(getPropertyKey());
@ -1071,9 +1104,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
protected boolean doLock() throws IOException {
boolean result = true;
if (!disableLocking && directory != null && lock == null) {
protected boolean doLock() throws IOException {
boolean result = true;
if (!disableLocking && directory != null && lock == null) {
String key = getPropertyKey();
String property = System.getProperty(key);
if (null == property) {
@ -1089,23 +1122,24 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
result = false;
}
}
return result;
}
return result;
}
private String getPropertyKey() throws IOException {
private String getPropertyKey() throws IOException {
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}
static {
BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
}
static {
BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
}
@Override
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;

View File

@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean
*
*/
@Deprecated
public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -78,7 +79,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
private boolean persistentIndex = true;
private BrokerService brokerService;
public KahaPersistenceAdapter(AtomicLong size) {
this.storeSize=size;
}
@ -87,6 +87,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
this(new AtomicLong());
}
@Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
try {
@ -104,6 +105,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return rc;
}
@Override
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = queues.get(destination);
if (rc == null) {
@ -117,6 +119,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return rc;
}
@Override
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
throws IOException {
TopicMessageStore rc = topics.get(destination);
@ -143,14 +146,15 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
*
* @param destination Destination to forget
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
try{
if(theStore!=null){
theStore.deleteMapContainer(destination,"queue-data");
}
if(theStore!=null){
theStore.deleteMapContainer(destination,"queue-data");
}
}catch(IOException e ){
LOG.error("Failed to remove store map container for queue:"+destination, e);
LOG.error("Failed to remove store map container for queue:"+destination, e);
}
}
@ -159,6 +163,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
*
* @param destination Destination to forget
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
@ -168,6 +173,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return result;
}
@Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
while (true) {
@ -194,32 +200,39 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return transactionStore;
}
@Override
public void beginTransaction(ConnectionContext context) {
}
@Override
public void commitTransaction(ConnectionContext context) throws IOException {
if (theStore != null) {
theStore.force();
}
}
@Override
public void rollbackTransaction(ConnectionContext context) {
}
@Override
public void start() throws Exception {
initialize();
}
@Override
public void stop() throws Exception {
if (theStore != null) {
theStore.close();
}
}
@Override
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
@Override
public void deleteAllMessages() throws IOException {
if (theStore != null) {
if (theStore.isInitialized()) {
@ -268,6 +281,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
@Override
public void setUsageManager(SystemUsage usageManager) {
}
@ -279,12 +293,12 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
}
public boolean isPersistentIndex() {
return persistentIndex;
}
return persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
@ -319,10 +333,12 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return directory;
}
@Override
public String toString() {
return "KahaPersistenceAdapter(" + getStoreName() + ")";
}
@Override
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
@ -331,20 +347,24 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
return brokerName;
}
@Override
public File getDirectory() {
return this.directory;
}
@Override
public void setDirectory(File directory) {
this.directory = directory;
}
@Override
public void checkpoint(boolean sync) throws IOException {
if (sync) {
getStore().force();
}
}
@Override
public long size(){
return storeSize.get();
}
@ -367,10 +387,12 @@ public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerService
}
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;