From 759fd2829ce147105c45962cb9d9d56bf5ff51c8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 6 Mar 2007 10:25:48 +0000 Subject: [PATCH] Deleted store implementations rapid and quick, as they are replaced by the AMQStore implementation git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515054 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/PersistenceAdapter.java | 43 +- .../store/PersistenceAdapterFactoryBean.java | 3 +- .../store/amq/AMQPersistenceAdapter.java | 745 ++++++++---------- .../amq/AMQPersistenceAdapterFactory.java | 114 +++ .../store/jdbc/JDBCPersistenceAdapter.java | 10 + .../journal/JournalPersistenceAdapter.java | 12 + .../JournalPersistenceAdapterFactory.java} | 22 +- .../store/kahadaptor/KahaMessageStore.java | 11 +- .../kahadaptor/KahaPersistenceAdapter.java | 126 +-- .../store/kahadaptor/KahaReferenceStore.java | 15 +- .../kahadaptor/KahaReferenceStoreAdapter.java | 14 +- .../kahadaptor/KahaTopicMessageStore.java | 4 +- .../kahadaptor/KahaTopicReferenceStore.java | 16 +- .../store/kahadaptor/TopicSubContainer.java | 18 +- .../memory/MemoryPersistenceAdapter.java | 9 + .../store/quick/QuickMessageStore.java | 495 ------------ .../store/quick/QuickPersistenceAdapter.java | 679 ---------------- .../store/quick/QuickTopicMessageStore.java | 211 ----- .../store/quick/QuickTransactionStore.java | 340 -------- .../store/quick/RecoveryListenerAdapter.java | 59 -- .../apache/activemq/store/quick/package.html | 25 - .../store/rapid/RapidMessageReference.java | 45 -- .../RapidMessageReferenceMarshaller.java | 46 -- .../store/rapid/RapidMessageStore.java | 379 --------- .../store/rapid/RapidPersistenceAdapter.java | 656 --------------- .../store/rapid/RapidTopicMessageStore.java | 319 -------- .../store/rapid/RapidTransactionStore.java | 303 ------- .../apache/activemq/store/rapid/package.html | 25 - 28 files changed, 661 insertions(+), 4083 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java rename activemq-core/src/main/java/org/apache/activemq/store/{DefaultPersistenceAdapterFactory.java => journal/JournalPersistenceAdapterFactory.java} (89%) delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/quick/package.html delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTransactionStore.java delete mode 100755 activemq-core/src/main/java/org/apache/activemq/store/rapid/package.html diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index ab72a225f5..d151b603aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.memory.UsageManager; +import java.io.File; import java.io.IOException; import java.util.Set; @@ -38,22 +39,30 @@ public interface PersistenceAdapter extends Service { * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination} * objects that the persistence store is aware exist. * - * @return + * @return active destinations */ public Set getDestinations(); /** * Factory method to create a new queue message store with the given destination name + * @param destination + * @return the message store + * @throws IOException */ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException; /** * Factory method to create a new topic message store with the given destination name + * @param destination + * @return the topic message store + * @throws IOException */ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException; /** * Factory method to create a new persistent prepared transaction store for XA recovery + * @return transaction store + * @throws IOException */ public TransactionStore createTransactionStore() throws IOException; @@ -66,27 +75,33 @@ public interface PersistenceAdapter extends Service { * real high performance its usually faster to perform many writes within the same * transaction to minimize latency caused by disk synchronization. This is especially * true when using tools like Berkeley Db or embedded JDBC servers. + * @param context + * @throws IOException */ public void beginTransaction(ConnectionContext context) throws IOException; /** * Commit a persistence transaction + * @param context + * @throws IOException * - * @see PersistenceAdapter#beginTransaction() + * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ public void commitTransaction(ConnectionContext context) throws IOException; /** * Rollback a persistence transaction + * @param context + * @throws IOException * - * @see PersistenceAdapter#beginTransaction() + * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ public void rollbackTransaction(ConnectionContext context) throws IOException; /** * - * @return + * @return last broker sequence * @throws IOException */ public long getLastMessageBrokerSequenceId() throws IOException; @@ -102,4 +117,24 @@ public interface PersistenceAdapter extends Service { * @param usageManager The UsageManager that is controlling the broker's memory usage. */ public void setUsageManager(UsageManager usageManager); + + /** + * Set the name of the broker using the adapter + * @param brokerName + */ + public void setBrokerName(String brokerName); + + /** + * Set the directory where any data files should be created + * @param dir + */ + public void setDirectory(File dir); + + /** + * checkpoint any + * @param sync + * @throws IOException + * + */ + public void checkpoint(boolean sync) throws IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java index 4dab6dae95..d3c5af6394 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java @@ -17,6 +17,7 @@ */ package org.apache.activemq.store; +import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; import org.springframework.beans.factory.FactoryBean; /** @@ -26,7 +27,7 @@ import org.springframework.beans.factory.FactoryBean; * * @version $Revision: 1.1 $ */ -public class PersistenceAdapterFactoryBean extends DefaultPersistenceAdapterFactory implements FactoryBean { +public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean { private PersistenceAdapter persistenceAdaptor; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 27c63af775..7fb5c14dac 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -1,20 +1,17 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.store.amq; import java.io.File; @@ -26,7 +23,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -54,7 +50,6 @@ import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.amq.AMQTransactionStore.Tx; import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation; import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; - import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; @@ -68,261 +63,231 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * An implementation of {@link PersistenceAdapter} designed for use with a - * {@link Journal} and then check pointing asynchronously on a timeout with some - * other long term persistent storage. + * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing + * asynchronously on a timeout with some other long term persistent storage. * * @org.apache.xbean.XBean * * @version $Revision: 1.17 $ */ -public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener { +public class AMQPersistenceAdapter implements PersistenceAdapter,UsageListener{ - private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class); - - private final ConcurrentHashMap queues = new ConcurrentHashMap(); - private final ConcurrentHashMap topics = new ConcurrentHashMap(); - + private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class); + private final ConcurrentHashMap queues=new ConcurrentHashMap(); + private final ConcurrentHashMap topics=new ConcurrentHashMap(); private AsyncDataManager asyncDataManager; - private KahaReferenceStoreAdapter referenceStoreAdapter; - private TaskRunnerFactory taskRunnerFactory; - private WireFormat wireFormat = new OpenWireFormat(); - + private ReferenceStoreAdapter referenceStoreAdapter; + private TaskRunnerFactory taskRunnerFactory; + private WireFormat wireFormat=new OpenWireFormat(); private UsageManager usageManager; - - private long cleanupInterval = 1000 * 60; - private long checkpointInterval = 1000 * 10; - - private int maxCheckpointWorkers = 1; - private int maxCheckpointMessageAddSize = 1024*4; - - private AMQTransactionStore transactionStore = new AMQTransactionStore(this); - + private long cleanupInterval=1000*60; + private long checkpointInterval=1000*10; + private int maxCheckpointWorkers=1; + private int maxCheckpointMessageAddSize=1024*4; + private AMQTransactionStore transactionStore=new AMQTransactionStore(this); private TaskRunner checkpointTask; - private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); - - private final AtomicBoolean started = new AtomicBoolean(false); + private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1); + private final AtomicBoolean started=new AtomicBoolean(false); private Runnable periodicCheckpointTask; - - private Runnable periodicCleanupTask; - private boolean deleteAllMessages; + private Runnable periodicCleanupTask; + private boolean deleteAllMessages; private boolean syncOnWrite; - private String brokerName; - private File directory; - + private String brokerName=""; + private File directory; - public AMQPersistenceAdapter() { - this("localhost"); - } - public AMQPersistenceAdapter(String brokerName) { - this.brokerName = brokerName; - this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName + "-amqstore"); + public String getBrokerName(){ + return this.brokerName; } - - public synchronized void start() throws Exception { - if( !started.compareAndSet(false, true) ) + public void setBrokerName(String brokerName){ + this.brokerName=brokerName; + if(this.referenceStoreAdapter!=null){ + this.referenceStoreAdapter.setBrokerName(brokerName); + } + } + + public synchronized void start() throws Exception{ + if(!started.compareAndSet(false,true)) return; - if (this.usageManager!=null) { + if(this.directory==null){ + this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName); + } + this.directory=new File(directory,"amqstore"); + this.directory.mkdirs(); + if(this.usageManager!=null){ this.usageManager.addUsageListener(this); } - - if( asyncDataManager == null ) { - asyncDataManager = createAsyncDataManager(); + if(asyncDataManager==null){ + asyncDataManager=createAsyncDataManager(); } - - if( referenceStoreAdapter==null ) { - referenceStoreAdapter = createReferenceStoreAdapter(); + if(referenceStoreAdapter==null){ + referenceStoreAdapter=createReferenceStoreAdapter(); } + referenceStoreAdapter.setDirectory(new File(directory,"kaha-reference-store")); + referenceStoreAdapter.setBrokerName(getBrokerName()); referenceStoreAdapter.setUsageManager(usageManager); - - if( taskRunnerFactory==null ) { - taskRunnerFactory = createTaskRunnerFactory(); + if(taskRunnerFactory==null){ + taskRunnerFactory=createTaskRunnerFactory(); } - - asyncDataManager.start(); - if( deleteAllMessages ) { - asyncDataManager.delete(); - try { - JournalTrace trace = new JournalTrace(); - trace.setMessage("DELETED "+new Date()); - Location location = asyncDataManager.write(wireFormat.marshal(trace), false); - asyncDataManager.setMark(location, true); - log.info("Journal deleted: "); - deleteAllMessages=false; - } catch (IOException e) { - throw e; - } catch (Throwable e) { - throw IOExceptionSupport.create(e); - } - - referenceStoreAdapter.deleteAllMessages(); + asyncDataManager.start(); + if(deleteAllMessages){ + asyncDataManager.delete(); + try{ + JournalTrace trace=new JournalTrace(); + trace.setMessage("DELETED "+new Date()); + Location location=asyncDataManager.write(wireFormat.marshal(trace),false); + asyncDataManager.setMark(location,true); + log.info("Journal deleted: "); + deleteAllMessages=false; + }catch(IOException e){ + throw e; + }catch(Throwable e){ + throw IOExceptionSupport.create(e); + } + referenceStoreAdapter.deleteAllMessages(); } referenceStoreAdapter.start(); - - Set files = referenceStoreAdapter.getReferenceFileIdsInUse(); - log.info("Active data files: "+files); - - checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ - public boolean iterate() { + Set files=referenceStoreAdapter.getReferenceFileIdsInUse(); + log.info("Active data files: "+files); + checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){ + + public boolean iterate(){ doCheckpoint(); return false; } - }, "ActiveMQ Journal Checkpoint Worker"); - + },"ActiveMQ Journal Checkpoint Worker"); createTransactionStore(); recover(); - // Do a checkpoint periodically. - periodicCheckpointTask = new Runnable() { - public void run() { - checkpoint(false); - } - }; - Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval); - - periodicCleanupTask = new Runnable() { - public void run() { - cleanup(); - } - }; - Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval); + periodicCheckpointTask=new Runnable(){ + public void run(){ + checkpoint(false); + } + }; + Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval); + periodicCleanupTask=new Runnable(){ + + public void run(){ + cleanup(); + } + }; + Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval); } - - public void stop() throws Exception { - - if( !started.compareAndSet(true, false) ) + public void stop() throws Exception{ + if(!started.compareAndSet(true,false)) return; - - this.usageManager.removeUsageListener(this); + this.usageManager.removeUsageListener(this); Scheduler.cancel(periodicCheckpointTask); Scheduler.cancel(periodicCleanupTask); - - - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - AMQMessageStore ms = iterator.next(); + Iterator iterator=queues.values().iterator(); + while(iterator.hasNext()){ + AMQMessageStore ms=iterator.next(); ms.stop(); } - - iterator = topics.values().iterator(); - while (iterator.hasNext()) { - final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next(); + iterator=topics.values().iterator(); + while(iterator.hasNext()){ + final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); ms.stop(); } - // Take one final checkpoint and stop checkpoint processing. checkpoint(true); - checkpointTask.shutdown(); - + checkpointTask.shutdown(); queues.clear(); topics.clear(); - - IOException firstException = null; + IOException firstException=null; referenceStoreAdapter.stop(); - try { + try{ log.debug("Journal close"); asyncDataManager.close(); - } catch (Exception e) { - firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); + }catch(Exception e){ + firstException=IOExceptionSupport.create("Failed to close journals: "+e,e); } - - if (firstException != null) { + if(firstException!=null){ throw firstException; } } - /** * When we checkpoint we move all the journalled data to long term storage. - * @param stopping + * + * @param stopping * * @param b */ - public void checkpoint(boolean sync) { - try { - if (asyncDataManager == null ) + public void checkpoint(boolean sync){ + try{ + if(asyncDataManager==null) throw new IllegalStateException("Journal is closed."); - - CountDownLatch latch = null; - synchronized(this) { - latch = nextCheckpointCountDownLatch; + CountDownLatch latch=null; + synchronized(this){ + latch=nextCheckpointCountDownLatch; } - checkpointTask.wakeup(); - - if (sync) { + if(sync){ if(log.isDebugEnabled()){ log.debug("Waitng for checkpoint to complete."); } latch.await(); } - } - catch (InterruptedException e) { + referenceStoreAdapter.checkpoint(sync); + }catch(InterruptedException e){ Thread.currentThread().interrupt(); - log.warn("Request to start checkpoint failed: " + e, e); + log.warn("Request to start checkpoint failed: "+e,e); + }catch(IOException e){ + log.error("checkpoint failed: "+e,e); } } - + /** * This does the actual checkpoint. - * @return + * + * @return */ - public boolean doCheckpoint() { - CountDownLatch latch = null; - synchronized(this) { - latch = nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch = new CountDownLatch(1); - } - try { - + public boolean doCheckpoint(){ + CountDownLatch latch=null; + synchronized(this){ + latch=nextCheckpointCountDownLatch; + nextCheckpointCountDownLatch=new CountDownLatch(1); + } + try{ if(log.isDebugEnabled()){ log.debug("Checkpoint started."); } - referenceStoreAdapter.sync(); - Location newMark = null; - - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - final AMQMessageStore ms = iterator.next(); - Location mark = (Location) ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; + + Location newMark=null; + Iterator iterator=queues.values().iterator(); + while(iterator.hasNext()){ + final AMQMessageStore ms=iterator.next(); + Location mark=(Location)ms.getMark(); + if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ + newMark=mark; } } - - iterator = topics.values().iterator(); - while (iterator.hasNext()) { - final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next(); - Location mark = (Location) ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; + iterator=topics.values().iterator(); + while(iterator.hasNext()){ + final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); + Location mark=(Location)ms.getMark(); + if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ + newMark=mark; } } - - try { - if (newMark != null) { + try{ + if(newMark!=null){ if(log.isDebugEnabled()){ - log.debug("Marking journal at: " + newMark); + log.debug("Marking journal at: "+newMark); } - asyncDataManager.setMark(newMark, false); - writeTraceMessage("CHECKPOINT "+new Date(), true); + asyncDataManager.setMark(newMark,false); + writeTraceMessage("CHECKPOINT "+new Date(),true); } + }catch(Exception e){ + log.error("Failed to mark the Journal: "+e,e); } - catch (Exception e) { - log.error("Failed to mark the Journal: " + e, e); - } - if(log.isDebugEnabled()){ log.debug("Checkpoint done."); } - } - catch(IOException e) { - log.error("Failed to sync reference store",e); - } - finally { + }finally{ latch.countDown(); } return true; @@ -330,197 +295,183 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener /** * Cleans up the data files - * @return - * @throws IOException + * + * @return + * @throws IOException */ - public void cleanup() { - - try { - Set inUse = referenceStoreAdapter.getReferenceFileIdsInUse(); - asyncDataManager.consolidateDataFilesNotIn(inUse); - } catch (IOException e) { - log.error("Could not cleanup data files: "+e, e); - } - + public void cleanup(){ + try{ + Set inUse=referenceStoreAdapter.getReferenceFileIdsInUse(); + asyncDataManager.consolidateDataFilesNotIn(inUse); + }catch(IOException e){ + log.error("Could not cleanup data files: "+e,e); + } } - - public Set getDestinations() { - Set destinations = new HashSet(referenceStoreAdapter.getDestinations()); + public Set getDestinations(){ + Set destinations=new HashSet(referenceStoreAdapter.getDestinations()); destinations.addAll(queues.keySet()); destinations.addAll(topics.keySet()); return destinations; } - private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { - if (destination.isQueue()) { - return createQueueMessageStore((ActiveMQQueue) destination); - } - else { - return createTopicMessageStore((ActiveMQTopic) destination); + private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{ + if(destination.isQueue()){ + return createQueueMessageStore((ActiveMQQueue)destination); + }else{ + return createTopicMessageStore((ActiveMQTopic)destination); } } - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - AMQMessageStore store = queues.get(destination); - if (store == null) { - ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); - store = new AMQMessageStore(this, checkpointStore, destination); - try { - store.start(); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - queues.put(destination, store); + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ + AMQMessageStore store=queues.get(destination); + if(store==null){ + ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination); + store=new AMQMessageStore(this,checkpointStore,destination); + try{ + store.start(); + }catch(Exception e){ + throw IOExceptionSupport.create(e); + } + queues.put(destination,store); } return store; } - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { - AMQTopicMessageStore store = (AMQTopicMessageStore) topics.get(destinationName); - if (store == null) { - TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); - store = new AMQTopicMessageStore(this, checkpointStore, destinationName); - try { - store.start(); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - topics.put(destinationName, store); + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{ + AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName); + if(store==null){ + TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName); + store=new AMQTopicMessageStore(this,checkpointStore,destinationName); + try{ + store.start(); + }catch(Exception e){ + throw IOExceptionSupport.create(e); + } + topics.put(destinationName,store); } return store; } - public TransactionStore createTransactionStore() throws IOException { + public TransactionStore createTransactionStore() throws IOException{ return transactionStore; } - public long getLastMessageBrokerSequenceId() throws IOException { + public long getLastMessageBrokerSequenceId() throws IOException{ return referenceStoreAdapter.getLastMessageBrokerSequenceId(); } - public void beginTransaction(ConnectionContext context) throws IOException { + public void beginTransaction(ConnectionContext context) throws IOException{ referenceStoreAdapter.beginTransaction(context); } - public void commitTransaction(ConnectionContext context) throws IOException { + public void commitTransaction(ConnectionContext context) throws IOException{ referenceStoreAdapter.commitTransaction(context); } - public void rollbackTransaction(ConnectionContext context) throws IOException { + public void rollbackTransaction(ConnectionContext context) throws IOException{ referenceStoreAdapter.rollbackTransaction(context); } - /** * @param location * @return * @throws IOException */ - public DataStructure readCommand(Location location) throws IOException { - try { - ByteSequence packet = asyncDataManager.read(location); - return (DataStructure) wireFormat.unmarshal(packet); - } catch (IOException e) { - throw createReadException(location, e); + public DataStructure readCommand(Location location) throws IOException{ + try{ + ByteSequence packet=asyncDataManager.read(location); + return (DataStructure)wireFormat.unmarshal(packet); + }catch(IOException e){ + throw createReadException(location,e); } } /** - * Move all the messages that were in the journal into long term storage. We - * just replay and do a checkpoint. + * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint. * * @throws IOException * @throws IOException * @throws InvalidLocationException * @throws IllegalStateException */ - private void recover() throws IllegalStateException, IOException { - - Location pos = null; - int redoCounter = 0; - - log.info("Journal Recovery Started from: " + asyncDataManager); - long start = System.currentTimeMillis(); - ConnectionContext context = new ConnectionContext(); - + private void recover() throws IllegalStateException,IOException{ + Location pos=null; + int redoCounter=0; + log.info("Journal Recovery Started from: "+asyncDataManager); + long start=System.currentTimeMillis(); + ConnectionContext context=new ConnectionContext(); // While we have records in the journal. - while ((pos = asyncDataManager.getNextLocation(pos)) != null) { - ByteSequence data = asyncDataManager.read(pos); - DataStructure c = (DataStructure) wireFormat.unmarshal(data); - - if (c instanceof Message ) { - Message message = (Message) c; - AMQMessageStore store = (AMQMessageStore) createMessageStore(message.getDestination()); - if ( message.isInTransaction()) { - transactionStore.addMessage(store, message, pos); - } - else { - if( store.replayAddMessage(context, message, pos) ) { - redoCounter++; + while((pos=asyncDataManager.getNextLocation(pos))!=null){ + ByteSequence data=asyncDataManager.read(pos); + DataStructure c=(DataStructure)wireFormat.unmarshal(data); + if(c instanceof Message){ + Message message=(Message)c; + AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination()); + if(message.isInTransaction()){ + transactionStore.addMessage(store,message,pos); + }else{ + if(store.replayAddMessage(context,message,pos)){ + redoCounter++; } } - } else { - switch (c.getDataStructureType()) { - case JournalQueueAck.DATA_STRUCTURE_TYPE: - { - JournalQueueAck command = (JournalQueueAck) c; - AMQMessageStore store = (AMQMessageStore) createMessageStore(command.getDestination()); - if (command.getMessageAck().isInTransaction()) { - transactionStore.removeMessage(store, command.getMessageAck(), pos); - } - else { - if( store.replayRemoveMessage(context, command.getMessageAck()) ) { - redoCounter++; + }else{ + switch(c.getDataStructureType()){ + case JournalQueueAck.DATA_STRUCTURE_TYPE: { + JournalQueueAck command=(JournalQueueAck)c; + AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination()); + if(command.getMessageAck().isInTransaction()){ + transactionStore.removeMessage(store,command.getMessageAck(),pos); + }else{ + if(store.replayRemoveMessage(context,command.getMessageAck())){ + redoCounter++; } } } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: - { - JournalTopicAck command = (JournalTopicAck) c; - AMQTopicMessageStore store = (AMQTopicMessageStore) createMessageStore(command.getDestination()); - if (command.getTransactionId() != null) { - transactionStore.acknowledge(store, command, pos); - } - else { - if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) { - redoCounter++; + break; + case JournalTopicAck.DATA_STRUCTURE_TYPE: { + JournalTopicAck command=(JournalTopicAck)c; + AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination()); + if(command.getTransactionId()!=null){ + transactionStore.acknowledge(store,command,pos); + }else{ + if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command + .getMessageId())){ + redoCounter++; } } } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: - { - JournalTransaction command = (JournalTransaction) c; - try { + break; + case JournalTransaction.DATA_STRUCTURE_TYPE: { + JournalTransaction command=(JournalTransaction)c; + try{ // Try to replay the packet. - switch (command.getType()) { + switch(command.getType()){ case JournalTransaction.XA_PREPARE: transactionStore.replayPrepare(command.getTransactionId()); break; case JournalTransaction.XA_COMMIT: case JournalTransaction.LOCAL_COMMIT: - Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); - if (tx == null) + Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared()); + if(tx==null) break; // We may be trying to replay a commit that - // was already committed. - + // was already committed. // Replay the committed operations. tx.getOperations(); - for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); - if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - if( op.store.replayAddMessage(context, (Message)op.data, op.location) ) + for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){ + TxOperation op=(TxOperation)iter.next(); + if(op.operationType==TxOperation.ADD_OPERATION_TYPE){ + if(op.store.replayAddMessage(context,(Message)op.data,op.location)) redoCounter++; } - if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - if( op.store.replayRemoveMessage(context, (MessageAck) op.data) ) + if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){ + if(op.store.replayRemoveMessage(context,(MessageAck)op.data)) redoCounter++; } - if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { - JournalTopicAck ack = (JournalTopicAck) op.data; - if( ((AMQTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) { + if(op.operationType==TxOperation.ACK_OPERATION_TYPE){ + JournalTopicAck ack=(JournalTopicAck)op.data; + if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack + .getSubscritionName(),ack.getMessageId())){ redoCounter++; } } @@ -531,42 +482,40 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener transactionStore.replayRollback(command.getTransactionId()); break; } - } - catch (IOException e) { - log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); + }catch(IOException e){ + log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e); } } - break; + break; case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace = (JournalTrace) c; - log.debug("TRACE Entry: " + trace.getMessage()); + JournalTrace trace=(JournalTrace)c; + log.debug("TRACE Entry: "+trace.getMessage()); break; default: - log.error("Unknown type of record in transaction log which will be discarded: " + c); + log.error("Unknown type of record in transaction log which will be discarded: "+c); } } } - Location location = writeTraceMessage("RECOVERED "+new Date(), true); - asyncDataManager.setMark(location, true); - long end = System.currentTimeMillis(); - - log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds."); + Location location=writeTraceMessage("RECOVERED "+new Date(),true); + asyncDataManager.setMark(location,true); + long end=System.currentTimeMillis(); + log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds."); } - private IOException createReadException(Location location, Exception e) { - return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); + private IOException createReadException(Location location,Exception e){ + return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e); } - protected IOException createWriteException(DataStructure packet, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); + protected IOException createWriteException(DataStructure packet,Exception e){ + return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e); } - protected IOException createWriteException(String command, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); + protected IOException createWriteException(String command,Exception e){ + return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e); } - protected IOException createRecoveryFailedException(Exception e) { - return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); + protected IOException createRecoveryFailedException(Exception e){ + return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e); } /** @@ -576,118 +525,119 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener * @return * @throws IOException */ - public Location writeCommand(DataStructure command, boolean syncHint) throws IOException { - return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite)); + public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{ + return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite)); } - private Location writeTraceMessage(String message, boolean sync) throws IOException { - JournalTrace trace = new JournalTrace(); + private Location writeTraceMessage(String message,boolean sync) throws IOException{ + JournalTrace trace=new JournalTrace(); trace.setMessage(message); - return writeCommand(trace, sync); + return writeCommand(trace,sync); } - public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { - newPercentUsage = ((newPercentUsage)/10)*10; - oldPercentUsage = ((oldPercentUsage)/10)*10; - if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { + public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ + newPercentUsage=((newPercentUsage)/10)*10; + oldPercentUsage=((oldPercentUsage)/10)*10; + if(newPercentUsage>=70&&oldPercentUsage topics=new ConcurrentHashMap(); - ConcurrentHashMap queues=new ConcurrentHashMap(); - ConcurrentHashMap messageStores=new ConcurrentHashMap(); + ConcurrentHashMap topics=new ConcurrentHashMap(); + ConcurrentHashMap queues=new ConcurrentHashMap(); + ConcurrentHashMap messageStores=new ConcurrentHashMap(); protected OpenWireFormat wireFormat=new OpenWireFormat(); private long maxDataFileLength=32*1024*1024; - - - private File dir; + private File directory; + private String brokerName; private Store theStore; - - public KahaPersistenceAdapter(File dir) throws IOException{ - if(!dir.exists()){ - dir.mkdirs(); - } - this.dir=dir; - wireFormat.setCacheEnabled(false); - wireFormat.setTightEncodingEnabled(true); - } + private boolean initialized; public Set getDestinations(){ Set rc=new HashSet(); @@ -81,7 +73,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ Object obj=i.next(); if(obj instanceof ActiveMQDestination){ - rc.add((ActiveMQDestination) obj); + rc.add((ActiveMQDestination)obj); } } }catch(IOException e){ @@ -127,25 +119,25 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ } public TransactionStore createTransactionStore() throws IOException{ - if(transactionStore==null){ - while (true) { - try { - Store store=getStore(); - MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); - container.setKeyMarshaller(new CommandMarshaller(wireFormat)); - container.setValueMarshaller(new TransactionMarshaller(wireFormat)); - container.load(); - transactionStore=new KahaTransactionStore(this,container); - break; - }catch(StoreLockedExcpetion e) { - log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)+" seconds for the Store to be unlocked."); + while(true){ + try{ + Store store=getStore(); + MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); + container.setKeyMarshaller(new CommandMarshaller(wireFormat)); + container.setValueMarshaller(new TransactionMarshaller(wireFormat)); + container.load(); + transactionStore=new KahaTransactionStore(this,container); + break; + }catch(StoreLockedExcpetion e){ + log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000) + +" seconds for the Store to be unlocked."); try{ Thread.sleep(STORE_LOCKED_WAIT_DELAY); }catch(InterruptedException e1){ } } - } + } } return transactionStore; } @@ -163,6 +155,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ } public void start() throws Exception{ + initialize(); } public void stop() throws Exception{ @@ -182,37 +175,37 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ }else{ theStore.delete(); } - }else { + }else{ StoreFactory.delete(getStoreName()); } } protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ Store store=getStore(); - MapContainer container=store.getMapContainer(id,containerName); + MapContainer container=store.getMapContainer(id,containerName); container.setKeyMarshaller(new MessageIdMarshaller()); - container.setValueMarshaller(new MessageMarshaller(wireFormat)); + container.setValueMarshaller(new MessageMarshaller(wireFormat)); container.load(); return container; } - + protected MapContainer getSubsMapContainer(Object id,String containerName) throws IOException{ Store store=getStore(); - MapContainer container=store.getMapContainer(id,containerName); + MapContainer container=store.getMapContainer(id,containerName); container.setKeyMarshaller(Store.StringMarshaller); - container.setValueMarshaller(createMessageMarshaller()); + container.setValueMarshaller(createMessageMarshaller()); container.load(); return container; } - protected Marshaller createMessageMarshaller() { - return new CommandMarshaller(wireFormat); - } + protected Marshaller createMessageMarshaller(){ + return new CommandMarshaller(wireFormat); + } - protected ListContainer getListContainer(Object id,String containerName) throws IOException{ + protected ListContainer getListContainer(Object id,String containerName) throws IOException{ Store store=getStore(); ListContainer container=store.getListContainer(id,containerName); - container.setMarshaller(createMessageMarshaller()); + container.setMarshaller(createMessageMarshaller()); container.load(); return container; } @@ -239,8 +232,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ this.maxDataFileLength=maxDataFileLength; } - - protected synchronized Store getStore() throws IOException{ if(theStore==null){ theStore=StoreFactory.open(getStoreName(),"rw"); @@ -248,13 +239,50 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ } return theStore; } - + private String getStoreName(){ - String name=dir.getAbsolutePath()+File.separator+"kaha.db"; - return name; + initialize(); + return directory.getAbsolutePath(); + } + + public String toString(){ + return "KahaPersistenceAdapter("+getStoreName()+")"; + } + + public void setBrokerName(String brokerName){ + this.brokerName=brokerName; } - public String toString(){ - return "KahaPersistenceAdapter(" + getStoreName() +")"; + public String getBrokerName(){ + return brokerName; } + + public File getDirectory(){ + return this.directory; + } + + public void setDirectory(File directory){ + this.directory=directory; + } + + public void checkpoint(boolean sync) throws IOException{ + if(sync){ + getStore().force(); + } + } + + private void initialize(){ + if(!initialized){ + initialized=true; + if(this.directory==null){ + this.directory=new File(IOHelper.getDefaultDataDirectory()); + this.directory=new File(this.directory,brokerName+"-kahastore"); + } + this.directory.mkdirs(); + wireFormat.setCacheEnabled(false); + wireFormat.setTightEncodingEnabled(true); + } + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 6acd860c0c..b6a6d22a14 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -77,7 +77,9 @@ public class KahaReferenceStore implements ReferenceStore{ entry=messageContainer.getFirst(); }else{ entry=messageContainer.refresh(entry); + if (entry != null) { entry=messageContainer.getNext(entry); + } } if(entry!=null){ int count=0; @@ -120,11 +122,14 @@ public class KahaReferenceStore implements ReferenceStore{ } public synchronized void removeMessage(MessageId msgId) throws IOException{ - ReferenceRecord rr=messageContainer.remove(msgId); - if(rr!=null){ - removeInterest(rr); - if(messageContainer.isEmpty()){ - resetBatching(); + StoreEntry entry=messageContainer.getEntry(msgId); + if(entry!=null){ + ReferenceRecord rr=messageContainer.remove(msgId); + if(rr!=null){ + removeInterest(rr); + if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ + resetBatching(); + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index bae8539a9f..23612657c9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -17,10 +17,8 @@ */ package org.apache.activemq.store.kahadaptor; -import java.io.File; import java.io.IOException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,7 +27,6 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MessageIdMarshaller; @@ -39,7 +36,6 @@ import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStoreAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,10 +47,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements private MaprecordReferences = new HashMap(); private boolean storeValid; - public KahaReferenceStoreAdapter(File dir) throws IOException { - super(dir); - } - + public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ throw new RuntimeException("Use createQueueReferenceStore instead"); } @@ -164,10 +157,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements } } - public void sync() throws IOException { - getStore().force(); - } - + protected MapContainer getMapReferenceContainer(Object id,String containerName) throws IOException{ Store store=getStore(); MapContainer container=store.getMapContainer(id,containerName); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index ab88617e94..07b2e53301 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -119,8 +119,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess } // add the subscriber ListContainer container=addSubscriberMessageContainer(key); + /* if(retroactive){ - for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(entry); @@ -128,6 +129,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess container.add(ref); } } + */ } public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 434008a93e..74be20329a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -17,7 +17,6 @@ package org.apache.activemq.store.kahadaptor; import java.io.IOException; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -85,7 +84,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); - container.add(ref); + StoreEntry listEntry = container.add(ref); + } } } @@ -118,8 +118,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, MessageId messageId) throws IOException{ - String subcriberId=getSubscriptionKey(clientId,subscriptionName); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); + String key=getSubscriptionKey(clientId,subscriptionName); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ ConsumerMessageRef ref=container.remove(); if(container.isEmpty()){ @@ -140,6 +140,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic removeInterest(rr); } }else{ + ackContainer.update(ref.getAckEntry(),tsa); } } @@ -163,13 +164,15 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic // add the subscriber ListContainer container=addSubscriberMessageContainer(key); if(retroactive){ - for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + /* + for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } + */ } } @@ -186,7 +189,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic public int getMessageCount(String clientId,String subscriberName) throws IOException{ String key=getSubscriptionKey(clientId,subscriberName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - return container.size(); + return container != null ? container.size() : 0; } public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{ @@ -226,6 +229,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) throws Exception{ + String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index 42cfb79232..cd822e0465 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -54,14 +54,20 @@ import org.apache.activemq.kaha.StoreEntry; return listContainer.isEmpty(); } - public void add(ConsumerMessageRef ref) { - listContainer.add(ref); + public StoreEntry add(ConsumerMessageRef ref) { + return listContainer.placeLast(ref); } - public ConsumerMessageRef remove() { - ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst(); - if (listContainer.isEmpty()) { - reset(); + public ConsumerMessageRef remove(){ + ConsumerMessageRef result=null; + if(!listContainer.isEmpty()){ + StoreEntry entry=listContainer.getFirst(); + if(entry!=null){ + result=(ConsumerMessageRef)listContainer.removeFirst(); + if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ + reset(); + } + } } return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 3a46b2415b..79e1690f7d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -158,4 +158,13 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { public String toString(){ return "MemoryPersistenceAdapter"; } + + public void setBrokerName(String brokerName){ + } + + public void setDirectory(File dir){ + } + + public void checkpoint(boolean sync) throws IOException{ + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java deleted file mode 100644 index 3b95d8a17b..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.quick; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map.Entry; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.JournalQueueAck; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.impl.async.Location; -import org.apache.activemq.memory.UsageManager; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.ReferenceStore; -import org.apache.activemq.store.ReferenceStore.ReferenceData; -import org.apache.activemq.thread.Task; -import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.TransactionTemplate; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A MessageStore that uses a Journal to store it's messages. - * - * @version $Revision: 1.14 $ - */ -public class QuickMessageStore implements MessageStore { - - private static final Log log = LogFactory.getLog(QuickMessageStore.class); - - protected final QuickPersistenceAdapter peristenceAdapter; - protected final QuickTransactionStore transactionStore; - protected final ReferenceStore referenceStore; - protected final ActiveMQDestination destination; - protected final TransactionTemplate transactionTemplate; - - private LinkedHashMap messages = new LinkedHashMap(); - private ArrayList messageAcks = new ArrayList(); - - /** A MessageStore that we can use to retrieve messages quickly. */ - private LinkedHashMap cpAddedMessageIds; - - protected Location lastLocation; - protected Location lastWrittenLocation; - - protected HashSet inFlightTxLocations = new HashSet(); - - protected final TaskRunner asyncWriteTask; - protected CountDownLatch flushLatch; - private final AtomicReference mark = new AtomicReference(); - - public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { - this.peristenceAdapter = adapter; - this.transactionStore = adapter.getTransactionStore(); - this.referenceStore = referenceStore; - this.destination = destination; - this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); - - asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){ - public boolean iterate() { - asyncWrite(); - return false; - }}, "Checkpoint: "+destination); - } - - public void setUsageManager(UsageManager usageManager) { - referenceStore.setUsageManager(usageManager); - } - - - /** - * Not synchronized since the Journal has better throughput if you increase - * the number of concurrent writes that it is doing. - */ - public void addMessage(ConnectionContext context, final Message message) throws IOException { - - final MessageId id = message.getMessageId(); - - final boolean debug = log.isDebugEnabled(); - - final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message add for: "+id+", at: "+location); - addMessage(message, location); - } else { - if( debug ) - log.debug("Journalled transacted message add for: "+id+", at: "+location); - synchronized( this ) { - inFlightTxLocations.add(location); - } - transactionStore.addMessage(this, message, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message add commit for: "+id+", at: "+location); - synchronized( QuickMessageStore.this ) { - inFlightTxLocations.remove(location); - addMessage(message, location); - } - } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message add rollback for: "+id+", at: "+location); - synchronized( QuickMessageStore.this ) { - inFlightTxLocations.remove(location); - } - } - }); - } - } - - private void addMessage(final Message message, final Location location) throws InterruptedIOException { - ReferenceData data = new ReferenceData(); - data.setExpiration(message.getExpiration()); - data.setFileId(location.getDataFileId()); - data.setOffset(location.getOffset()); - synchronized (this) { - lastLocation = location; - messages.put(message.getMessageId(), data); - } - try { - asyncWriteTask.wakeup(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - - public boolean replayAddMessage(ConnectionContext context, Message message, Location location) { - MessageId id = message.getMessageId(); - try { - // Only add the message if it has not already been added. - ReferenceData data = referenceStore.getMessageReference(id); - if( data==null ) { - data = new ReferenceData(); - data.setExpiration(message.getExpiration()); - data.setFileId(location.getDataFileId()); - data.setOffset(location.getOffset()); - referenceStore.addMessageReference(context, id, data); - return true; - } - } - catch (Throwable e) { - log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e); - } - return false; - } - - /** - */ - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { - final boolean debug = log.isDebugEnabled(); - JournalQueueAck remove = new JournalQueueAck(); - remove.setDestination(destination); - remove.setMessageAck(ack); - - final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); - removeMessage(ack, location); - } else { - if( debug ) - log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); - synchronized( this ) { - inFlightTxLocations.add(location); - } - transactionStore.removeMessage(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); - synchronized( QuickMessageStore.this ) { - inFlightTxLocations.remove(location); - removeMessage(ack, location); - } - } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); - synchronized( QuickMessageStore.this ) { - inFlightTxLocations.remove(location); - } - } - }); - - } - } - - private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { - ReferenceData data; - synchronized (this) { - lastLocation = location; - MessageId id = ack.getLastMessageId(); - data = messages.remove(id); - if (data == null) { - messageAcks.add(ack); - } - } - - if (data == null) { - try { - asyncWriteTask.wakeup(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - } - - public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { - try { - // Only remove the message if it has not already been removed. - ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId()); - if( t!=null ) { - referenceStore.removeMessage(context, messageAck); - return true; - } - } - catch (Throwable e) { - log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); - } - return false; - } - - /** - * Waits till the lastest data has landed on the referenceStore - * @throws InterruptedIOException - */ - public void flush() throws InterruptedIOException { - log.debug("flush"); - CountDownLatch countDown; - synchronized(this) { - if( lastWrittenLocation == lastLocation ) { - return; - } - if( flushLatch== null ) { - flushLatch = new CountDownLatch(1); - } - countDown = flushLatch; - } - try { - asyncWriteTask.wakeup(); - countDown.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - - /** - * @return - * @throws IOException - */ - private void asyncWrite() { - try { - CountDownLatch countDown; - synchronized(this) { - countDown = flushLatch; - flushLatch = null; - } - - mark.set(doAsyncWrite()); - - if ( countDown != null ) { - countDown.countDown(); - } - } catch (IOException e) { - log.error("Checkpoint failed: "+e, e); - } - } - - /** - * @return - * @throws IOException - */ - protected Location doAsyncWrite() throws IOException { - - final ArrayList cpRemovedMessageLocations; - final ArrayList cpActiveJournalLocations; - final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); - final Location lastLocation; - - // swap out the message hash maps.. - synchronized (this) { - cpAddedMessageIds = this.messages; - cpRemovedMessageLocations = this.messageAcks; - cpActiveJournalLocations=new ArrayList(inFlightTxLocations); - this.messages = new LinkedHashMap(); - this.messageAcks = new ArrayList(); - lastLocation = this.lastLocation; - } - - if( log.isDebugEnabled() ) - log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "+cpRemovedMessageLocations.size()+" "); - - transactionTemplate.run(new Callback() { - public void execute() throws Exception { - - int size = 0; - - PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); - ConnectionContext context = transactionTemplate.getContext(); - - // Checkpoint the added messages. - Iterator> iterator = cpAddedMessageIds.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - try { - referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() ); - } catch (Throwable e) { - log.warn("Message could not be added to long term store: " + e.getMessage(), e); - } - - size ++; - - // Commit the batch if it's getting too big - if( size >= maxCheckpointMessageAddSize ) { - persitanceAdapter.commitTransaction(context); - persitanceAdapter.beginTransaction(context); - size=0; - } - } - - persitanceAdapter.commitTransaction(context); - persitanceAdapter.beginTransaction(context); - - // Checkpoint the removed messages. - for (MessageAck ack : cpRemovedMessageLocations) { - try { - referenceStore.removeMessage(transactionTemplate.getContext(), ack); - } catch (Throwable e) { - log.debug("Message could not be removed from long term store: " + e.getMessage(), e); - } - } - - } - - }); - - log.debug("Batch update done."); - - synchronized (this) { - cpAddedMessageIds = null; - lastWrittenLocation = lastLocation; - } - - if( cpActiveJournalLocations.size() > 0 ) { - Collections.sort(cpActiveJournalLocations); - return cpActiveJournalLocations.get(0); - } else { - return lastLocation; - } - } - - /** - * - */ - public Message getMessage(MessageId identity) throws IOException { - - ReferenceData data=null; - - synchronized (this) { - // Is it still in flight??? - data = messages.get(identity); - if( data==null && cpAddedMessageIds!=null ) { - data = cpAddedMessageIds.get(identity); - } - } - - if( data==null ) { - data = referenceStore.getMessageReference(identity); - if( data==null ) { - return null; - } - } - - Location location = new Location(); - location.setDataFileId(data.getFileId()); - location.setOffset(data.getOffset()); - - DataStructure rc = peristenceAdapter.readCommand(location); - - try { - return (Message) rc; - } catch (ClassCastException e) { - throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc); - } - } - - /** - * Replays the referenceStore first as those messages are the oldest ones, - * then messages are replayed from the transaction log and then the cache is - * updated. - * - * @param listener - * @throws Exception - */ - public void recover(final MessageRecoveryListener listener) throws Exception { - flush(); - referenceStore.recover(new RecoveryListenerAdapter(this, listener)); - } - - public void start() throws Exception { - referenceStore.start(); - } - - public void stop() throws Exception { - asyncWriteTask.shutdown(); - referenceStore.stop(); - } - - /** - * @return Returns the longTermStore. - */ - public ReferenceStore getReferenceStore() { - return referenceStore; - } - - /** - * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) - */ - public void removeAllMessages(ConnectionContext context) throws IOException { - flush(); - referenceStore.removeAllMessages(context); - } - - public ActiveMQDestination getDestination() { - return destination; - } - - public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { - throw new IOException("The journal does not support message references."); - } - - public String getMessageReference(MessageId identity) throws IOException { - throw new IOException("The journal does not support message references."); - } - - /** - * @return - * @throws IOException - * @see org.apache.activemq.store.MessageStore#getMessageCount() - */ - public int getMessageCount() throws IOException{ - flush(); - return referenceStore.getMessageCount(); - } - - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ - flush(); - referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener)); - - } - - - public void resetBatching(){ - referenceStore.resetBatching(); - - } - - public Location getMark() { - return mark.get(); - } - -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java deleted file mode 100644 index d90131ae53..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java +++ /dev/null @@ -1,679 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.quick; - -import java.io.File; -import java.io.IOException; -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activeio.journal.Journal; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.JournalQueueAck; -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.JournalTrace; -import org.apache.activemq.command.JournalTransaction; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.kaha.impl.async.AsyncDataManager; -import org.apache.activemq.kaha.impl.async.Location; -import org.apache.activemq.memory.UsageListener; -import org.apache.activemq.memory.UsageManager; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.ReferenceStore; -import org.apache.activemq.store.ReferenceStoreAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; -import org.apache.activemq.store.quick.QuickTransactionStore.Tx; -import org.apache.activemq.store.quick.QuickTransactionStore.TxOperation; -import org.apache.activemq.thread.DefaultThreadPools; -import org.apache.activemq.thread.Scheduler; -import org.apache.activemq.thread.Task; -import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * An implementation of {@link PersistenceAdapter} designed for use with a - * {@link Journal} and then check pointing asynchronously on a timeout with some - * other long term persistent storage. - * - * @org.apache.xbean.XBean - * - * @version $Revision: 1.17 $ - */ -public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListener { - - private static final Log log = LogFactory.getLog(QuickPersistenceAdapter.class); - - private final ConcurrentHashMap queues = new ConcurrentHashMap(); - private final ConcurrentHashMap topics = new ConcurrentHashMap(); - - private AsyncDataManager asyncDataManager; - private ReferenceStoreAdapter referenceStoreAdapter; - private TaskRunnerFactory taskRunnerFactory; - private WireFormat wireFormat = new OpenWireFormat(); - - private UsageManager usageManager; - - private long cleanupInterval = 1000 * 60; - private long checkpointInterval = 1000 * 10; - - private int maxCheckpointWorkers = 1; - private int maxCheckpointMessageAddSize = 1024*4; - - private QuickTransactionStore transactionStore = new QuickTransactionStore(this); - - private TaskRunner checkpointTask; - private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); - - private final AtomicBoolean started = new AtomicBoolean(false); - private Runnable periodicCheckpointTask; - - private Runnable periodicCleanupTask; - private boolean deleteAllMessages; - private File directory = new File(IOHelper.getDefaultDataDirectory() + "/quick"); - - - - public synchronized void start() throws Exception { - if( !started.compareAndSet(false, true) ) - return; - this.usageManager.addUsageListener(this); - - if( asyncDataManager == null ) { - asyncDataManager = createAsyncDataManager(); - } - - if( referenceStoreAdapter==null ) { - referenceStoreAdapter = createReferenceStoreAdapter(); - } - referenceStoreAdapter.setUsageManager(usageManager); - - if( taskRunnerFactory==null ) { - taskRunnerFactory = createTaskRunnerFactory(); - } - - asyncDataManager.start(); - if( deleteAllMessages ) { - asyncDataManager.delete(); - try { - JournalTrace trace = new JournalTrace(); - trace.setMessage("DELETED "+new Date()); - Location location = asyncDataManager.write(wireFormat.marshal(trace), false); - asyncDataManager.setMark(location, true); - log.info("Journal deleted: "); - deleteAllMessages=false; - } catch (IOException e) { - throw e; - } catch (Throwable e) { - throw IOExceptionSupport.create(e); - } - - referenceStoreAdapter.deleteAllMessages(); - } - referenceStoreAdapter.start(); - - Set files = referenceStoreAdapter.getReferenceFileIdsInUse(); - log.info("Active data files: "+files); - - checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ - public boolean iterate() { - doCheckpoint(); - return false; - } - }, "ActiveMQ Journal Checkpoint Worker"); - - createTransactionStore(); - recover(); - - // Do a checkpoint periodically. - periodicCheckpointTask = new Runnable() { - public void run() { - checkpoint(false); - } - }; - Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval); - - periodicCleanupTask = new Runnable() { - public void run() { - cleanup(); - } - }; - Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval); - - } - - - public void stop() throws Exception { - - if( !started.compareAndSet(true, false) ) - return; - - this.usageManager.removeUsageListener(this); - Scheduler.cancel(periodicCheckpointTask); - Scheduler.cancel(periodicCleanupTask); - - - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - QuickMessageStore ms = iterator.next(); - ms.stop(); - } - - iterator = topics.values().iterator(); - while (iterator.hasNext()) { - final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); - ms.stop(); - } - - // Take one final checkpoint and stop checkpoint processing. - checkpoint(true); - checkpointTask.shutdown(); - - queues.clear(); - topics.clear(); - - IOException firstException = null; - referenceStoreAdapter.stop(); - try { - log.debug("Journal close"); - asyncDataManager.close(); - } catch (Exception e) { - firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); - } - - if (firstException != null) { - throw firstException; - } - } - - - /** - * When we checkpoint we move all the journalled data to long term storage. - * @param stopping - * - * @param b - */ - public void checkpoint(boolean sync) { - try { - if (asyncDataManager == null ) - throw new IllegalStateException("Journal is closed."); - - CountDownLatch latch = null; - synchronized(this) { - latch = nextCheckpointCountDownLatch; - } - - checkpointTask.wakeup(); - - if (sync) { - log.debug("Waitng for checkpoint to complete."); - latch.await(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Request to start checkpoint failed: " + e, e); - } - } - - /** - * This does the actual checkpoint. - * @return - */ - public boolean doCheckpoint() { - CountDownLatch latch = null; - synchronized(this) { - latch = nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch = new CountDownLatch(1); - } - try { - - log.debug("Checkpoint started."); - Location newMark = null; - - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - final QuickMessageStore ms = iterator.next(); - Location mark = (Location) ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; - } - } - - iterator = topics.values().iterator(); - while (iterator.hasNext()) { - final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); - Location mark = (Location) ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; - } - } - - try { - if (newMark != null) { - log.debug("Marking journal at: " + newMark); - asyncDataManager.setMark(newMark, false); - writeTraceMessage("CHECKPOINT "+new Date(), true); - } - } - catch (Exception e) { - log.error("Failed to mark the Journal: " + e, e); - } - -// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) { -// // We may be check pointing more often than the checkpointInterval if under high use -// // But we don't want to clean up the db that often. -// long now = System.currentTimeMillis(); -// if( now > lastCleanup+checkpointInterval ) { -// lastCleanup = now; -// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup(); -// } -// } - - log.debug("Checkpoint done."); - } - finally { - latch.countDown(); - } - return true; - } - - /** - * Cleans up the data files - * @return - * @throws IOException - */ - public void cleanup() { - - try { - Set inUse = referenceStoreAdapter.getReferenceFileIdsInUse(); - asyncDataManager.consolidateDataFilesNotIn(inUse); - } catch (IOException e) { - log.error("Could not cleanup data files: "+e, e); - } - - } - - - public Set getDestinations() { - Set destinations = new HashSet(referenceStoreAdapter.getDestinations()); - destinations.addAll(queues.keySet()); - destinations.addAll(topics.keySet()); - return destinations; - } - - private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { - if (destination.isQueue()) { - return createQueueMessageStore((ActiveMQQueue) destination); - } - else { - return createTopicMessageStore((ActiveMQTopic) destination); - } - } - - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - QuickMessageStore store = queues.get(destination); - if (store == null) { - ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); - store = new QuickMessageStore(this, checkpointStore, destination); - try { - store.start(); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - queues.put(destination, store); - } - return store; - } - - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { - QuickTopicMessageStore store = (QuickTopicMessageStore) topics.get(destinationName); - if (store == null) { - TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); - store = new QuickTopicMessageStore(this, checkpointStore, destinationName); - try { - store.start(); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - topics.put(destinationName, store); - } - return store; - } - - public TransactionStore createTransactionStore() throws IOException { - return transactionStore; - } - - public long getLastMessageBrokerSequenceId() throws IOException { - return referenceStoreAdapter.getLastMessageBrokerSequenceId(); - } - - public void beginTransaction(ConnectionContext context) throws IOException { - referenceStoreAdapter.beginTransaction(context); - } - - public void commitTransaction(ConnectionContext context) throws IOException { - referenceStoreAdapter.commitTransaction(context); - } - - public void rollbackTransaction(ConnectionContext context) throws IOException { - referenceStoreAdapter.rollbackTransaction(context); - } - - - /** - * @param location - * @return - * @throws IOException - */ - public DataStructure readCommand(Location location) throws IOException { - try { - ByteSequence packet = asyncDataManager.read(location); - return (DataStructure) wireFormat.unmarshal(packet); - } catch (IOException e) { - throw createReadException(location, e); - } - } - - /** - * Move all the messages that were in the journal into long term storage. We - * just replay and do a checkpoint. - * - * @throws IOException - * @throws IOException - * @throws InvalidLocationException - * @throws IllegalStateException - */ - private void recover() throws IllegalStateException, IOException { - - Location pos = null; - int redoCounter = 0; - - log.info("Journal Recovery Started from: " + asyncDataManager); - long start = System.currentTimeMillis(); - ConnectionContext context = new ConnectionContext(); - - // While we have records in the journal. - while ((pos = asyncDataManager.getNextLocation(pos)) != null) { - ByteSequence data = asyncDataManager.read(pos); - DataStructure c = (DataStructure) wireFormat.unmarshal(data); - - if (c instanceof Message ) { - Message message = (Message) c; - QuickMessageStore store = (QuickMessageStore) createMessageStore(message.getDestination()); - if ( message.isInTransaction()) { - transactionStore.addMessage(store, message, pos); - } - else { - if( store.replayAddMessage(context, message, pos) ) { - redoCounter++; - } - } - } else { - switch (c.getDataStructureType()) { - case JournalQueueAck.DATA_STRUCTURE_TYPE: - { - JournalQueueAck command = (JournalQueueAck) c; - QuickMessageStore store = (QuickMessageStore) createMessageStore(command.getDestination()); - if (command.getMessageAck().isInTransaction()) { - transactionStore.removeMessage(store, command.getMessageAck(), pos); - } - else { - if( store.replayRemoveMessage(context, command.getMessageAck()) ) { - redoCounter++; - } - } - } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: - { - JournalTopicAck command = (JournalTopicAck) c; - QuickTopicMessageStore store = (QuickTopicMessageStore) createMessageStore(command.getDestination()); - if (command.getTransactionId() != null) { - transactionStore.acknowledge(store, command, pos); - } - else { - if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) { - redoCounter++; - } - } - } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: - { - JournalTransaction command = (JournalTransaction) c; - try { - // Try to replay the packet. - switch (command.getType()) { - case JournalTransaction.XA_PREPARE: - transactionStore.replayPrepare(command.getTransactionId()); - break; - case JournalTransaction.XA_COMMIT: - case JournalTransaction.LOCAL_COMMIT: - Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); - if (tx == null) - break; // We may be trying to replay a commit that - // was already committed. - - // Replay the committed operations. - tx.getOperations(); - for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); - if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - if( op.store.replayAddMessage(context, (Message)op.data, op.location) ) - redoCounter++; - } - if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - if( op.store.replayRemoveMessage(context, (MessageAck) op.data) ) - redoCounter++; - } - if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { - JournalTopicAck ack = (JournalTopicAck) op.data; - if( ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) { - redoCounter++; - } - } - } - break; - case JournalTransaction.LOCAL_ROLLBACK: - case JournalTransaction.XA_ROLLBACK: - transactionStore.replayRollback(command.getTransactionId()); - break; - } - } - catch (IOException e) { - log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); - } - } - break; - case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace = (JournalTrace) c; - log.debug("TRACE Entry: " + trace.getMessage()); - break; - default: - log.error("Unknown type of record in transaction log which will be discarded: " + c); - } - } - } - Location location = writeTraceMessage("RECOVERED "+new Date(), true); - asyncDataManager.setMark(location, true); - long end = System.currentTimeMillis(); - - log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds."); - } - - private IOException createReadException(Location location, Exception e) { - return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); - } - - protected IOException createWriteException(DataStructure packet, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); - } - - protected IOException createWriteException(String command, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); - } - - protected IOException createRecoveryFailedException(Exception e) { - return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); - } - - /** - * - * @param command - * @param sync - * @return - * @throws IOException - */ - public Location writeCommand(DataStructure command, boolean sync) throws IOException { - return asyncDataManager.write(wireFormat.marshal(command), sync); - } - - private Location writeTraceMessage(String message, boolean sync) throws IOException { - JournalTrace trace = new JournalTrace(); - trace.setMessage(message); - return writeCommand(trace, sync); - } - - public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { - newPercentUsage = ((newPercentUsage)/10)*10; - oldPercentUsage = ((oldPercentUsage)/10)*10; - if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { - checkpoint(false); - } - } - - public QuickTransactionStore getTransactionStore() { - return transactionStore; - } - - public void deleteAllMessages() throws IOException { - deleteAllMessages=true; - } - - - - public String toString(){ - return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")"; - } - - /////////////////////////////////////////////////////////////////// - // Subclass overridables - /////////////////////////////////////////////////////////////////// - protected AsyncDataManager createAsyncDataManager() { - AsyncDataManager manager = new AsyncDataManager(); - manager.setDirectory(new File(directory, "journal")); - return manager; - } - - protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { - KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); - return adaptor; - } - - protected TaskRunnerFactory createTaskRunnerFactory() { - return DefaultThreadPools.getDefaultTaskRunnerFactory(); - } - - - /////////////////////////////////////////////////////////////////// - // Property Accessors - /////////////////////////////////////////////////////////////////// - - public AsyncDataManager getAsyncDataManager() { - return asyncDataManager; - } - public void setAsyncDataManager(AsyncDataManager asyncDataManager) { - this.asyncDataManager = asyncDataManager; - } - - public ReferenceStoreAdapter getReferenceStoreAdapter() { - return referenceStoreAdapter; - } - public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { - this.referenceStoreAdapter = referenceStoreAdapter; - } - - public TaskRunnerFactory getTaskRunnerFactory() { - return taskRunnerFactory; - } - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { - this.taskRunnerFactory = taskRunnerFactory; - } - - /** - * @return Returns the wireFormat. - */ - public WireFormat getWireFormat() { - return wireFormat; - } - public void setWireFormat(WireFormat wireFormat) { - this.wireFormat = wireFormat; - } - - public UsageManager getUsageManager() { - return usageManager; - } - public void setUsageManager(UsageManager usageManager) { - this.usageManager = usageManager; - } - - public int getMaxCheckpointMessageAddSize() { - return maxCheckpointMessageAddSize; - } - public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { - this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; - } - - public int getMaxCheckpointWorkers() { - return maxCheckpointWorkers; - } - public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { - this.maxCheckpointWorkers = maxCheckpointWorkers; - } - - public File getDirectory() { - return directory; - } - - public void setDirectory(File directory) { - this.directory = directory; - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java deleted file mode 100644 index 15dc9425bf..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.quick; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.HashMap; -import java.util.Iterator; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.kaha.impl.async.Location; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.SubscriptionKey; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A MessageStore that uses a Journal to store it's messages. - * - * @version $Revision: 1.13 $ - */ -public class QuickTopicMessageStore extends QuickMessageStore implements TopicMessageStore { - - private static final Log log = LogFactory.getLog(QuickTopicMessageStore.class); - - private TopicReferenceStore topicReferenceStore; - private HashMap ackedLastAckLocations = new HashMap(); - - public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { - super(adapter, topicReferenceStore, destinationName); - this.topicReferenceStore = topicReferenceStore; - } - - public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { - flush(); - topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); - } - - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{ - flush(); - topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener)); - } - - public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { - return topicReferenceStore.lookupSubscription(clientId, subscriptionName); - } - - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { - flush(); - topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive); - } - - /** - */ - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { - final boolean debug = log.isDebugEnabled(); - - JournalTopicAck ack = new JournalTopicAck(); - ack.setDestination(destination); - ack.setMessageId(messageId); - ack.setMessageSequenceId(messageId.getBrokerSequenceId()); - ack.setSubscritionName(subscriptionName); - ack.setClientId(clientId); - ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null); - final Location location = peristenceAdapter.writeCommand(ack, false); - - final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled acknowledge for: "+messageId+", at: "+location); - acknowledge(messageId, location, key); - } else { - if( debug ) - log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); - synchronized (this) { - inFlightTxLocations.add(location); - } - transactionStore.acknowledge(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); - synchronized (QuickTopicMessageStore.this) { - inFlightTxLocations.remove(location); - acknowledge(messageId, location, key); - } - } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); - synchronized (QuickTopicMessageStore.this) { - inFlightTxLocations.remove(location); - } - } - }); - } - - } - - public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { - try { - SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); - if( sub != null ) { - topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId); - return true; - } - } - catch (Throwable e) { - log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); - } - return false; - } - - - /** - * @param messageId - * @param location - * @param key - * @throws InterruptedIOException - */ - private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException { - synchronized(this) { - lastLocation = location; - ackedLastAckLocations.put(key, messageId); - } - try { - asyncWriteTask.wakeup(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - - @Override - protected Location doAsyncWrite() throws IOException { - - final HashMap cpAckedLastAckLocations; - - // swap out the hash maps.. - synchronized (this) { - cpAckedLastAckLocations = this.ackedLastAckLocations; - this.ackedLastAckLocations = new HashMap(); - } - - Location location = super.doAsyncWrite(); - - transactionTemplate.run(new Callback() { - public void execute() throws Exception { - // Checkpoint the acknowledged messages. - Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); - while (iterator.hasNext()) { - SubscriptionKey subscriptionKey = iterator.next(); - MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); - topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); - } - } - } ); - - return location; - } - - /** - * @return Returns the longTermStore. - */ - public TopicReferenceStore getTopicReferenceStore() { - return topicReferenceStore; - } - - public void deleteSubscription(String clientId, String subscriptionName) throws IOException { - topicReferenceStore.deleteSubscription(clientId, subscriptionName); - } - - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - return topicReferenceStore.getAllSubscriptions(); - } - - - public int getMessageCount(String clientId,String subscriberName) throws IOException{ - flush(); - return topicReferenceStore.getMessageCount(clientId,subscriberName); - } - - public void resetBatching(String clientId,String subscriptionName) { - topicReferenceStore.resetBatching(clientId,subscriptionName); - } - - - -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java deleted file mode 100644 index c1ff17c91d..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java +++ /dev/null @@ -1,340 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.quick; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; - -import javax.transaction.xa.XAException; - -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.JournalTransaction; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.kaha.impl.async.Location; -import org.apache.activemq.store.TransactionRecoveryListener; -import org.apache.activemq.store.TransactionStore; - - -/** - */ -public class QuickTransactionStore implements TransactionStore { - - private final QuickPersistenceAdapter peristenceAdapter; - Map inflightTransactions = new LinkedHashMap(); - Map preparedTransactions = new LinkedHashMap(); - private boolean doingRecover; - - - public static class TxOperation { - - static final byte ADD_OPERATION_TYPE = 0; - static final byte REMOVE_OPERATION_TYPE = 1; - static final byte ACK_OPERATION_TYPE = 3; - - public byte operationType; - public QuickMessageStore store; - public Object data; - public Location location; - - public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) { - this.operationType=operationType; - this.store=store; - this.data=data; - this.location=location; - } - - } - /** - * Operations - * @version $Revision: 1.6 $ - */ - public static class Tx { - - private final Location location; - private ArrayList operations = new ArrayList(); - - public Tx(Location location) { - this.location=location; - } - - public void add(QuickMessageStore store, Message msg, Location location) { - operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location)); - } - - public void add(QuickMessageStore store, MessageAck ack) { - operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null)); - } - - public void add(QuickTopicMessageStore store, JournalTopicAck ack) { - operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null)); - } - - public Message[] getMessages() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { - list.add(op.data); - } - } - Message rc[] = new Message[list.size()]; - list.toArray(rc); - return rc; - } - - public MessageAck[] getAcks() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { - list.add(op.data); - } - } - MessageAck rc[] = new MessageAck[list.size()]; - list.toArray(rc); - return rc; - } - - public ArrayList getOperations() { - return operations; - } - - } - - public QuickTransactionStore(QuickPersistenceAdapter adapter) { - this.peristenceAdapter = adapter; - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) - */ - public void prepare(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=inflightTransactions.remove(txid); - } - if(tx==null) - return; - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); - synchronized(preparedTransactions){ - preparedTransactions.put(txid,tx); - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) - */ - public void replayPrepare(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=inflightTransactions.remove(txid); - } - if(tx==null) - return; - synchronized(preparedTransactions){ - preparedTransactions.put(txid,tx); - } - } - - public Tx getTx(TransactionId txid,Location location){ - Tx tx=null; - synchronized(inflightTransactions){ - tx=inflightTransactions.get(txid); - } - if(tx==null){ - tx=new Tx(location); - inflightTransactions.put(txid,tx); - } - return tx; - } - - /** - * @throws XAException - * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) - */ - public void commit(TransactionId txid,boolean wasPrepared) throws IOException{ - Tx tx; - if(wasPrepared){ - synchronized(preparedTransactions){ - tx=preparedTransactions.remove(txid); - } - }else{ - synchronized(inflightTransactions){ - tx=inflightTransactions.remove(txid); - } - } - if(tx==null) - return; - if(txid.isXATransaction()){ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); - }else{ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), - true); - } - } - - /** - * @throws XAException - * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) - */ - public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ - if(wasPrepared){ - synchronized(preparedTransactions){ - return preparedTransactions.remove(txid); - } - }else{ - synchronized(inflightTransactions){ - return inflightTransactions.remove(txid); - } - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) - */ - public void rollback(TransactionId txid) throws IOException{ - Tx tx=null; - synchronized(inflightTransactions){ - tx=inflightTransactions.remove(txid); - } - if(tx!=null) - synchronized(preparedTransactions){ - tx=preparedTransactions.remove(txid); - } - if(tx!=null){ - if(txid.isXATransaction()){ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); - }else{ - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), - true); - } - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) - */ - public void replayRollback(TransactionId txid) throws IOException{ - boolean inflight=false; - synchronized(inflightTransactions){ - inflight=inflightTransactions.remove(txid)!=null; - } - if(inflight){ - synchronized(preparedTransactions){ - preparedTransactions.remove(txid); - } - } - } - - public void start() throws Exception { - } - - public void stop() throws Exception { - } - - synchronized public void recover(TransactionRecoveryListener listener) throws IOException{ - // All the in-flight transactions get rolled back.. - synchronized(inflightTransactions){ - inflightTransactions.clear(); - } - this.doingRecover=true; - try{ - Map txs=null; - synchronized(preparedTransactions){ - txs=new LinkedHashMap(preparedTransactions); - } - for(Iterator iter=txs.keySet().iterator();iter.hasNext();){ - Object txid=iter.next(); - Tx tx=txs.get(txid); - listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); - } - }finally{ - this.doingRecover=false; - } - } - - /** - * @param message - * @throws IOException - */ - void addMessage(QuickMessageStore store, Message message, Location location) throws IOException { - Tx tx = getTx(message.getTransactionId(), location); - tx.add(store, message, location); - } - - /** - * @param ack - * @throws IOException - */ - public void removeMessage(QuickMessageStore store, MessageAck ack, Location location) throws IOException { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); - } - - - public void acknowledge(QuickTopicMessageStore store, JournalTopicAck ack, Location location) { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); - } - - - public Location checkpoint() throws IOException{ - // Nothing really to checkpoint.. since, we don't - // checkpoint tx operations in to long term store until they are committed. - // But we keep track of the first location of an operation - // that was associated with an active tx. The journal can not - // roll over active tx records. - Location rc=null; - synchronized(inflightTransactions){ - for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ - Tx tx=iter.next(); - Location location=tx.location; - if(rc==null||rc.compareTo(location)<0){ - rc=location; - } - } - } - synchronized(preparedTransactions){ - for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ - Tx tx=iter.next(); - Location location=tx.location; - if(rc==null||rc.compareTo(location)<0){ - rc=location; - } - } - return rc; - } - } - - public boolean isDoingRecover() { - return doingRecover; - } - - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java deleted file mode 100644 index 377836eaf5..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.quick; - -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -final class RecoveryListenerAdapter implements MessageRecoveryListener { - static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class); - - private final MessageStore store; - private final MessageRecoveryListener listener; - - RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) { - this.store = store; - this.listener = listener; - } - - public void finished() { - listener.finished(); - } - - public boolean hasSpace() { - return listener.hasSpace(); - } - - public void recoverMessage(Message message) throws Exception { - listener.recoverMessage(message); - } - - public void recoverMessageReference(MessageId ref) throws Exception { - Message message = this.store.getMessage(ref); - if( message !=null ){ - listener.recoverMessage( message ); - } else { - log.error("Message id "+ref+" could not be recovered from the data store!"); - } - - } -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html b/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html deleted file mode 100644 index 3fdd3ad6b0..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - -experimental store implementation - - - diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java deleted file mode 100644 index 530dffb9b5..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.rapid; - -import org.apache.activeio.journal.active.Location; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; - -public class RapidMessageReference { - public final MessageId messageId; - public final Location location; - - public RapidMessageReference(MessageId messageId, Location location) { - this.messageId = messageId; - this.location=location; - } - public RapidMessageReference(Message message, Location location) { - this.messageId = message.getMessageId(); - this.location=location; - } - - public MessageId getMessageId() { - return messageId; - } - - public Location getLocation() { - return location; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java deleted file mode 100644 index 320aaee266..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.rapid; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.activeio.journal.active.Location; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.Marshaller; - -public class RapidMessageReferenceMarshaller implements Marshaller{ - - - - public Object readPayload(DataInput dataIn) throws IOException{ - MessageId mid = new MessageId(dataIn.readUTF()); - Location loc = new Location(dataIn.readInt(),dataIn.readInt()); - RapidMessageReference rmr = new RapidMessageReference(mid,loc); - return rmr; - } - - public void writePayload(Object object,DataOutput dataOut) throws IOException{ - RapidMessageReference rmr = (RapidMessageReference)object; - dataOut.writeUTF(rmr.getMessageId().toString()); - dataOut.writeInt(rmr.getLocation().getLogFileId()); - dataOut.writeInt(rmr.getLocation().getLogFileOffset()); - - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java deleted file mode 100755 index b0dd62ac97..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java +++ /dev/null @@ -1,379 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.rapid; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; - -import org.apache.activeio.journal.active.Location; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.JournalQueueAck; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.ListContainer; -import org.apache.activemq.kaha.MapContainer; -import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.memory.UsageListener; -import org.apache.activemq.memory.UsageManager; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.LRUCache; -import org.apache.activemq.util.TransactionTemplate; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A MessageStore that uses a Journal to store it's messages. - * - * @version $Revision: 1.14 $ - */ -public class RapidMessageStore implements MessageStore, UsageListener { - - private static final Log log = LogFactory.getLog(RapidMessageStore.class); - - protected final RapidPersistenceAdapter peristenceAdapter; - protected final RapidTransactionStore transactionStore; - protected final ListContainer messageContainer; - protected final ActiveMQDestination destination; - protected final TransactionTemplate transactionTemplate; - protected final LRUCache cache; - protected UsageManager usageManager; - protected StoreEntry batchEntry = null; - - - - protected Location lastLocation; - protected HashSet inFlightTxLocations = new HashSet(); - - public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, ListContainer container, int maximumCacheSize) { - this.peristenceAdapter = adapter; - this.transactionStore = adapter.getTransactionStore(); - this.messageContainer = container; - this.destination = destination; - this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); - this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false); -// populate the cache - StoreEntry entry=messageContainer.getFirst(); - int count = 0; - if(entry!=null){ - do{ - RapidMessageReference msg = (RapidMessageReference)messageContainer.get(entry); - cache.put(msg.getMessageId(),entry); - entry = messageContainer.getNext(entry); - count++; - }while(entry!=null && count < maximumCacheSize); - } - } - - - /** - * Not synchronized since the Journal has better throughput if you increase - * the number of concurrent writes that it is doing. - */ - public synchronized void addMessage(ConnectionContext context, final Message message) throws IOException { - - final MessageId id = message.getMessageId(); - - final boolean debug = log.isDebugEnabled(); - final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); - final RapidMessageReference md = new RapidMessageReference(message, location); - - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message add for: "+id+", at: "+location); - addMessage(md); - } else { - message.incrementReferenceCount(); - if( debug ) - log.debug("Journalled transacted message add for: "+id+", at: "+location); - synchronized( this ) { - inFlightTxLocations.add(location); - } - transactionStore.addMessage(this, message, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message add commit for: "+id+", at: "+location); - message.decrementReferenceCount(); - synchronized( RapidMessageStore.this ) { - inFlightTxLocations.remove(location); - addMessage(md); - } - } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message add rollback for: "+id+", at: "+location); - message.decrementReferenceCount(); - synchronized( RapidMessageStore.this ) { - inFlightTxLocations.remove(location); - } - } - }); - } - } - - private synchronized void addMessage(final RapidMessageReference messageReference){ - StoreEntry item=messageContainer.placeLast(messageReference); - cache.put(messageReference.getMessageId(),item); - } - - static protected String toString(Location location) { - Location l = (Location) location; - return l.getLogFileId()+":"+l.getLogFileOffset(); - } - - static protected Location toLocation(String t) { - String[] strings = t.split(":"); - if( strings.length!=2 ) - throw new IllegalArgumentException("Invalid location: "+t); - return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1])); - } - - public void replayAddMessage(ConnectionContext context, Message message, Location location) { - try { - RapidMessageReference messageReference = new RapidMessageReference(message, location); - addMessage(messageReference); - } - catch (Throwable e) { - log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); - } - } - - /** - */ - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { - final boolean debug = log.isDebugEnabled(); - JournalQueueAck remove = new JournalQueueAck(); - remove.setDestination(destination); - remove.setMessageAck(ack); - - final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); - if( !context.isInTransaction() ) { - if( debug ) - log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); - removeMessage(ack.getLastMessageId()); - } else { - if( debug ) - log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); - synchronized( this ) { - inFlightTxLocations.add(location); - } - transactionStore.removeMessage(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception { - if( debug ) - log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); - synchronized( RapidMessageStore.this ) { - inFlightTxLocations.remove(location); - removeMessage(ack.getLastMessageId()); - } - } - public void afterRollback() throws Exception { - if( debug ) - log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); - synchronized( RapidMessageStore.this ) { - inFlightTxLocations.remove(location); - } - } - }); - - } - } - - - public synchronized void removeMessage(MessageId msgId) throws IOException{ - StoreEntry entry=(StoreEntry)cache.remove(msgId); - if(entry!=null){ - entry = messageContainer.refresh(entry); - messageContainer.remove(entry); - }else{ - for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { - RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry); - if(msg.getMessageId().equals(msgId)){ - messageContainer.remove(entry); - break; - } - } - } - if (messageContainer.isEmpty()) { - resetBatching(); - } - } - - public void replayRemoveMessage(ConnectionContext context, MessageAck ack) { - try { - MessageId id = ack.getLastMessageId(); - removeMessage(id); - } - catch (Throwable e) { - log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); - } - } - - - public synchronized Message getMessage(MessageId identity) throws IOException{ - RapidMessageReference result=null; - StoreEntry entry=(StoreEntry)cache.get(identity); - if(entry!=null){ - entry = messageContainer.refresh(entry); - result = (RapidMessageReference)messageContainer.get(entry); - }else{ - for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { - RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry); - if(msg.getMessageId().equals(identity)){ - result=msg; - cache.put(identity,entry); - break; - } - } - } - if (result == null ) - return null; - return (Message) peristenceAdapter.readCommand(result.getLocation()); - } - - /** - * Replays the checkpointStore first as those messages are the oldest ones, - * then messages are replayed from the transaction log and then the cache is - * updated. - * - * @param listener - * @throws Exception - */ - - public synchronized void recover(MessageRecoveryListener listener) throws Exception{ - for(Iterator iter=messageContainer.iterator();iter.hasNext();){ - RapidMessageReference messageReference=(RapidMessageReference) iter.next(); - Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation()); - listener.recoverMessage(m); - } - listener.finished(); - } - - public void start() { - if( this.usageManager != null ) - this.usageManager.addUsageListener(this); - } - - public void stop() { - if( this.usageManager != null ) - this.usageManager.removeUsageListener(this); - } - - /** - * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) - */ - public synchronized void removeAllMessages(ConnectionContext context) throws IOException { - messageContainer.clear(); - cache.clear(); - } - - public ActiveMQDestination getDestination() { - return destination; - } - - public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { - throw new IOException("Does not support message references."); - } - - public String getMessageReference(MessageId identity) throws IOException { - throw new IOException("Does not support message references."); - } - - - public void setUsageManager(UsageManager usageManager) { - this.usageManager = usageManager; - } - - /** - * @return - * @throws IOException - */ - public Location checkpoint() throws IOException { - - ArrayList cpActiveJournalLocations; - - // swap out the message hash maps.. - synchronized (this) { - cpActiveJournalLocations=new ArrayList(inFlightTxLocations); - } - - if( cpActiveJournalLocations.size() > 0 ) { - Collections.sort(cpActiveJournalLocations); - return (Location) cpActiveJournalLocations.get(0); - } else { - return lastLocation; - } - } - - - - public int getMessageCount(){ - return messageContainer.size(); - } - - public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ - StoreEntry entry=batchEntry; - if(entry==null){ - entry=messageContainer.getFirst(); - }else{ - entry=messageContainer.refresh(entry); - entry=messageContainer.getNext(entry); - } - if(entry!=null){ - int count=0; - do{ - RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(entry); - Message msg=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); - if(msg!=null){ - Message message=(Message)msg; - listener.recoverMessage(message); - count++; - } - batchEntry=entry; - entry=messageContainer.getNext(entry); - }while(entry!=null&&countlastCheckpointRequest+checkpointInterval){ - checkpoint(false,true); - } - } - }; - } - - public RapidPersistenceAdapter(Journal journal,TaskRunnerFactory taskRunnerFactory) throws IOException{ - this.journal=journal; - journal.setJournalEventListener(this); - File dir=((JournalImpl)journal).getLogDirectory(); - String name=dir.getAbsolutePath()+File.separator+"kaha.db"; - store=StoreFactory.open(name,"rw"); - checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){ - - public boolean iterate(){ - return doCheckpoint(); - } - },"ActiveMQ Checkpoint Worker"); - } - - public Set getDestinations(){ - Set rc=new HashSet(); - try{ - for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ - Object obj=i.next(); - if(obj instanceof ActiveMQDestination){ - rc.add(obj); - } - } - }catch(IOException e){ - log.error("Failed to get destinations ",e); - } - return rc; - } - - private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{ - if(destination.isQueue()){ - return createQueueMessageStore((ActiveMQQueue)destination); - }else{ - return createTopicMessageStore((ActiveMQTopic)destination); - } - } - - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ - RapidMessageStore store=(RapidMessageStore)queues.get(destination); - if(store==null){ - ListContainer messageContainer=getListContainer(destination,"topic-data"); - store=new RapidMessageStore(this,destination,messageContainer,maximumDestinationCacheSize); - queues.put(destination,store); - } - return store; - } - - protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ - MapContainer container=store.getMapContainer(id,containerName); - container.setKeyMarshaller(new StringMarshaller()); - if(useExternalMessageReferences){ - container.setValueMarshaller(new StringMarshaller()); - }else{ - container.setValueMarshaller(new CommandMarshaller(wireFormat)); - } - container.load(); - return container; - } - - protected ListContainer getListContainer(Object id,String containerName) throws IOException{ - Store store=getStore(); - ListContainer container=store.getListContainer(id,containerName); - container.setMarshaller(new RapidMessageReferenceMarshaller()); - container.load(); - return container; - } - - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ - TopicMessageStore rc=(TopicMessageStore)topics.get(destination); - if(rc==null){ - Store store=getStore(); - ListContainer messageContainer=getListContainer(destination,"topic-data"); - MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); - ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); - ackContainer.setMarshaller(new TopicSubAckMarshaller()); - rc=new RapidTopicMessageStore(this,store,messageContainer,ackContainer,subsContainer,destination, - maximumDestinationCacheSize); - topics.put(destination,rc); - } - return rc; - } - - public TransactionStore createTransactionStore() throws IOException{ - return transactionStore; - } - - public long getLastMessageBrokerSequenceId() throws IOException{ - // TODO: implement this. - return 0; - } - - public void beginTransaction(ConnectionContext context) throws IOException{ - } - - public void commitTransaction(ConnectionContext context) throws IOException{ - } - - public void rollbackTransaction(ConnectionContext context) throws IOException{ - } - - public synchronized void start() throws Exception{ - if(!started.compareAndSet(false,true)) - return; - checkpointExecutor=new ThreadPoolExecutor(maxCheckpointWorkers,maxCheckpointWorkers,30,TimeUnit.SECONDS, - new LinkedBlockingQueue(),new ThreadFactory(){ - - public Thread newThread(Runnable runable){ - Thread t=new Thread(runable,"Journal checkpoint worker"); - t.setPriority(7); - return t; - } - }); - // checkpointExecutor.allowCoreThreadTimeOut(true); - createTransactionStore(); - recover(); - // Do a checkpoint periodically. - Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval/10); - } - - public void stop() throws Exception{ - if(!started.compareAndSet(true,false)) - return; - Scheduler.cancel(periodicCheckpointTask); - // Take one final checkpoint and stop checkpoint processing. - checkpoint(false,true); - checkpointTask.shutdown(); - checkpointExecutor.shutdown(); - queues.clear(); - topics.clear(); - IOException firstException=null; - try{ - journal.close(); - }catch(Exception e){ - firstException=IOExceptionSupport.create("Failed to close journals: "+e,e); - } - store.close(); - if(firstException!=null){ - throw firstException; - } - } - - // Properties - // ------------------------------------------------------------------------- - /** - * @return Returns the wireFormat. - */ - public WireFormat getWireFormat(){ - return wireFormat; - } - - // Implementation methods - // ------------------------------------------------------------------------- - /** - * The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this - * for us. - * - * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) - */ - public void overflowNotification(RecordLocation safeLocation){ - checkpoint(false,true); - } - - /** - * When we checkpoint we move all the journalled data to long term storage. - * - * @param stopping - * - * @param b - */ - public void checkpoint(boolean sync,boolean fullCheckpoint){ - try{ - if(journal==null) - throw new IllegalStateException("Journal is closed."); - long now=System.currentTimeMillis(); - CountDownLatch latch=null; - synchronized(this){ - latch=nextCheckpointCountDownLatch; - lastCheckpointRequest=now; - if(fullCheckpoint){ - this.fullCheckPoint=true; - } - } - checkpointTask.wakeup(); - if(sync){ - log.debug("Waking for checkpoint to complete."); - latch.await(); - } - }catch(InterruptedException e){ - log.warn("Request to start checkpoint failed: "+e,e); - } - } - - /** - * This does the actual checkpoint. - * - * @return - */ - public boolean doCheckpoint(){ - CountDownLatch latch=null; - boolean fullCheckpoint; - synchronized(this){ - latch=nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch=new CountDownLatch(1); - fullCheckpoint=this.fullCheckPoint; - this.fullCheckPoint=false; - } - try{ - log.debug("Checkpoint started."); - RecordLocation newMark=null; - ArrayList futureTasks=new ArrayList(queues.size()+topics.size()); - // - // We do many partial checkpoints (fullCheckpoint==false) to move topic messages - // to long term store as soon as possible. - // - // We want to avoid doing that for queue messages since removes the come in the same - // checkpoint cycle will nullify the previous message add. Therefore, we only - // checkpoint queues on the fullCheckpoint cycles. - // - if(fullCheckpoint){ - Iterator iterator=queues.values().iterator(); - while(iterator.hasNext()){ - try{ - final RapidMessageStore ms=(RapidMessageStore)iterator.next(); - FutureTask task=new FutureTask(new Callable(){ - - public Object call() throws Exception{ - return ms.checkpoint(); - } - }); - futureTasks.add(task); - checkpointExecutor.execute(task); - }catch(Exception e){ - log.error("Failed to checkpoint a message store: "+e,e); - } - } - } - Iterator iterator=topics.values().iterator(); - while(iterator.hasNext()){ - try{ - final RapidTopicMessageStore ms=(RapidTopicMessageStore)iterator.next(); - FutureTask task=new FutureTask(new Callable(){ - - public Object call() throws Exception{ - return ms.checkpoint(); - } - }); - futureTasks.add(task); - checkpointExecutor.execute(task); - }catch(Exception e){ - log.error("Failed to checkpoint a message store: "+e,e); - } - } - try{ - for(Iterator iter=futureTasks.iterator();iter.hasNext();){ - FutureTask ft=(FutureTask)iter.next(); - RecordLocation mark=(RecordLocation)ft.get(); - // We only set a newMark on full checkpoints. - if(fullCheckpoint){ - if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ - newMark=mark; - } - } - } - }catch(Throwable e){ - log.error("Failed to checkpoint a message store: "+e,e); - } - if(fullCheckpoint){ - try{ - if(newMark!=null){ - log.debug("Marking journal at: "+newMark); - journal.setMark(newMark,true); - } - }catch(Exception e){ - log.error("Failed to mark the Journal: "+e,e); - } - // TODO: do we need to implement a periodic clean up? - // if (longTermPersistence instanceof JDBCPersistenceAdapter) { - // // We may be check pointing more often than the checkpointInterval if under high use - // // But we don't want to clean up the db that often. - // long now = System.currentTimeMillis(); - // if( now > lastCleanup+checkpointInterval ) { - // lastCleanup = now; - // ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); - // } - // } - } - log.debug("Checkpoint done."); - }finally{ - latch.countDown(); - } - synchronized(this){ - return this.fullCheckPoint; - } - } - - /** - * @param location - * @return - * @throws IOException - */ - public DataStructure readCommand(RecordLocation location) throws IOException{ - try{ - Packet data=journal.read(location); - return (DataStructure)wireFormat.unmarshal(toByteSequence(data)); - }catch(InvalidRecordLocationException e){ - throw createReadException(location,e); - }catch(IOException e){ - throw createReadException(location,e); - } - } - - /** - * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint. - * - * @throws IOException - * @throws IOException - * @throws InvalidRecordLocationException - * @throws IllegalStateException - */ - private void recover() throws IllegalStateException,InvalidRecordLocationException,IOException,IOException{ - Location pos=null; - int transactionCounter=0; - log.info("Journal Recovery Started."); - ConnectionContext context=new ConnectionContext(); - // While we have records in the journal. - while((pos=(Location)journal.getNextRecordLocation(pos))!=null){ - Packet data=journal.read(pos); - DataStructure c=(DataStructure)wireFormat.unmarshal(toByteSequence(data)); - if(c instanceof Message){ - Message message=(Message)c; - RapidMessageStore store=(RapidMessageStore)createMessageStore(message.getDestination()); - if(message.isInTransaction()){ - transactionStore.addMessage(store,message,pos); - }else{ - store.replayAddMessage(context,message,pos); - transactionCounter++; - } - }else{ - switch(c.getDataStructureType()){ - case JournalQueueAck.DATA_STRUCTURE_TYPE: { - JournalQueueAck command=(JournalQueueAck)c; - RapidMessageStore store=(RapidMessageStore)createMessageStore(command.getDestination()); - if(command.getMessageAck().isInTransaction()){ - transactionStore.removeMessage(store,command.getMessageAck(),pos); - }else{ - store.replayRemoveMessage(context,command.getMessageAck()); - transactionCounter++; - } - } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: { - JournalTopicAck command=(JournalTopicAck)c; - RapidTopicMessageStore store=(RapidTopicMessageStore)createMessageStore(command.getDestination()); - if(command.getTransactionId()!=null){ - transactionStore.acknowledge(store,command,pos); - }else{ - store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command - .getMessageId()); - transactionCounter++; - } - } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: { - JournalTransaction command=(JournalTransaction)c; - try{ - // Try to replay the packet. - switch(command.getType()){ - case JournalTransaction.XA_PREPARE: - transactionStore.replayPrepare(command.getTransactionId()); - break; - case JournalTransaction.XA_COMMIT: - case JournalTransaction.LOCAL_COMMIT: - Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared()); - if(tx==null) - break; // We may be trying to replay a commit that - // was already committed. - // Replay the committed operations. - for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){ - TxOperation op=(TxOperation)iter.next(); - if(op.operationType==TxOperation.ADD_OPERATION_TYPE){ - op.store.replayAddMessage(context,(Message)op.data,op.location); - } - if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){ - op.store.replayRemoveMessage(context,(MessageAck)op.data); - } - if(op.operationType==TxOperation.ACK_OPERATION_TYPE){ - JournalTopicAck ack=(JournalTopicAck)op.data; - ((RapidTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack - .getSubscritionName(),ack.getMessageId()); - } - } - transactionCounter++; - break; - case JournalTransaction.LOCAL_ROLLBACK: - case JournalTransaction.XA_ROLLBACK: - transactionStore.replayRollback(command.getTransactionId()); - break; - } - }catch(IOException e){ - log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e); - } - } - break; - case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace=(JournalTrace)c; - log.debug("TRACE Entry: "+trace.getMessage()); - break; - default: - log.error("Unknown type of record in transaction log which will be discarded: "+c); - } - } - } - RecordLocation location=writeTraceMessage("RECOVERED",true); - journal.setMark(location,true); - log.info("Journal Recovered: "+transactionCounter+" message(s) in transactions recovered."); - } - - private IOException createReadException(RecordLocation location,Exception e){ - return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e); - } - - protected IOException createWriteException(DataStructure packet,Exception e){ - return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e); - } - - protected IOException createWriteException(String command,Exception e){ - return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e); - } - - protected IOException createRecoveryFailedException(Exception e){ - return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e); - } - - /** - * - * @param command - * @param sync - * @return - * @throws IOException - */ - public Location writeCommand(DataStructure command,boolean sync) throws IOException{ - if(started.get()) - return (Location)journal.write(toPacket(wireFormat.marshal(command)),sync); - throw new IOException("closed"); - } - - private RecordLocation writeTraceMessage(String message,boolean sync) throws IOException{ - JournalTrace trace=new JournalTrace(); - trace.setMessage(message); - return writeCommand(trace,sync); - } - - public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ - if(newPercentUsage>80&&oldPercentUsage0){ - final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); - final RapidMessageReference md = new RapidMessageReference(message, location); - StoreEntry messageEntry=messageContainer.placeLast(md); - TopicSubAck tsa=new TopicSubAck(); - tsa.setCount(subscriberCount); - tsa.setMessageEntry(messageEntry); - StoreEntry ackEntry=ackContainer.placeLast(tsa); - for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ - TopicSubContainer container=(TopicSubContainer)i.next(); - ConsumerMessageRef ref=new ConsumerMessageRef(); - ref.setAckEntry(ackEntry); - ref.setMessageEntry(messageEntry); - container.add(ref); - } - } - } - - public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, - MessageId messageId) throws IOException{ - String subcriberId=getSubscriptionKey(clientId,subscriptionName); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); - if(container!=null){ - ConsumerMessageRef ref=(ConsumerMessageRef)container.remove(); - if(ref!=null){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); - if(tsa!=null){ - if(tsa.decrementCount()<=0){ - ackContainer.remove(ref.getAckEntry()); - messageContainer.remove(tsa.getMessageEntry()); - }else{ - ackContainer.update(ref.getAckEntry(),tsa); - } - } - } - } - } - - public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{ - return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); - } - - public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) - throws IOException{ - SubscriptionInfo info=new SubscriptionInfo(); - info.setDestination(destination); - info.setClientId(clientId); - info.setSelector(selector); - info.setSubcriptionName(subscriptionName); - String key=getSubscriptionKey(clientId,subscriptionName); - // if already exists - won't add it again as it causes data files - // to hang around - if(!subscriberContainer.containsKey(key)){ - subscriberContainer.put(key,info); - } - ListContainer container=addSubscriberMessageContainer(key); - if(retroactive){ - for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); - ConsumerMessageRef ref=new ConsumerMessageRef(); - ref.setAckEntry(entry); - ref.setMessageEntry(tsa.getMessageEntry()); - container.add(ref); - } - } - } - - public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{ - String key=getSubscriptionKey(clientId,subscriptionName); - removeSubscriberMessageContainer(key); - } - - public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) - throws Exception{ - String key=getSubscriptionKey(clientId,subscriptionName); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - if(container!=null){ - for(Iterator i=container.iterator();i.hasNext();){ - ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); - RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref - .getMessageEntry()); - if(messageReference!=null){ - Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); - listener.recoverMessage(m); - } - } - } - listener.finished(); - } - - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, - MessageRecoveryListener listener) throws Exception{ - String key=getSubscriptionKey(clientId,subscriptionName); - TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - if(container!=null){ - int count=0; - StoreEntry entry=container.getBatchEntry(); - if(entry==null){ - entry=container.getEntry(); - }else{ - entry=container.refreshEntry(entry); - entry=container.getNextEntry(entry); - } - if(entry!=null){ - do{ - ConsumerMessageRef consumerRef=container.get(entry); - RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef - .getMessageEntry()); - if(messageReference!=null){ - Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); - listener.recoverMessage(m); - count++; - } - container.setBatchEntry(entry); - entry=container.getNextEntry(entry); - }while(entry!=null&&count - - - - - -experimental store implementation - - -