Deleted store implementations rapid and quick, as they are replaced by the AMQStore implementation

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515054 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-06 10:25:48 +00:00
parent aee4378a8a
commit 759fd2829c
28 changed files with 661 additions and 4083 deletions

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import java.io.File;
import java.io.IOException;
import java.util.Set;
@ -38,22 +39,30 @@ public interface PersistenceAdapter extends Service {
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
* objects that the persistence store is aware exist.
*
* @return
* @return active destinations
*/
public Set<ActiveMQDestination> getDestinations();
/**
* Factory method to create a new queue message store with the given destination name
* @param destination
* @return the message store
* @throws IOException
*/
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
/**
* Factory method to create a new topic message store with the given destination name
* @param destination
* @return the topic message store
* @throws IOException
*/
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
/**
* Factory method to create a new persistent prepared transaction store for XA recovery
* @return transaction store
* @throws IOException
*/
public TransactionStore createTransactionStore() throws IOException;
@ -66,27 +75,33 @@ public interface PersistenceAdapter extends Service {
* real high performance its usually faster to perform many writes within the same
* transaction to minimize latency caused by disk synchronization. This is especially
* true when using tools like Berkeley Db or embedded JDBC servers.
* @param context
* @throws IOException
*/
public void beginTransaction(ConnectionContext context) throws IOException;
/**
* Commit a persistence transaction
* @param context
* @throws IOException
*
* @see PersistenceAdapter#beginTransaction()
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
public void commitTransaction(ConnectionContext context) throws IOException;
/**
* Rollback a persistence transaction
* @param context
* @throws IOException
*
* @see PersistenceAdapter#beginTransaction()
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
public void rollbackTransaction(ConnectionContext context) throws IOException;
/**
*
* @return
* @return last broker sequence
* @throws IOException
*/
public long getLastMessageBrokerSequenceId() throws IOException;
@ -102,4 +117,24 @@ public interface PersistenceAdapter extends Service {
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
/**
* Set the name of the broker using the adapter
* @param brokerName
*/
public void setBrokerName(String brokerName);
/**
* Set the directory where any data files should be created
* @param dir
*/
public void setDirectory(File dir);
/**
* checkpoint any
* @param sync
* @throws IOException
*
*/
public void checkpoint(boolean sync) throws IOException;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.activemq.store;
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
import org.springframework.beans.factory.FactoryBean;
/**
@ -26,7 +27,7 @@ import org.springframework.beans.factory.FactoryBean;
*
* @version $Revision: 1.1 $
*/
public class PersistenceAdapterFactoryBean extends DefaultPersistenceAdapterFactory implements FactoryBean {
public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean {
private PersistenceAdapter persistenceAdaptor;

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IOHelper;
/**
* An implementation of {@link PersistenceAdapterFactory}
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{
private TaskRunnerFactory taskRunnerFactory;
private File dataDirectory;
private int journalThreadPriority = Thread.MAX_PRIORITY;
private String brokerName="localhost";
/**
* @return a AMQPersistenceAdapter
* @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter()
*/
public PersistenceAdapter createPersistenceAdapter(){
AMQPersistenceAdapter result = new AMQPersistenceAdapter();
result.setDirectory(getDataDirectory());
result.setTaskRunnerFactory(getTaskRunnerFactory());
result.setBrokerName(getBrokerName());
return result;
}
/**
* @return the dataDirectory
*/
public File getDataDirectory(){
if(this.dataDirectory==null){
this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
}
return this.dataDirectory;
}
/**
* @param dataDirectory the dataDirectory to set
*/
public void setDataDirectory(File dataDirectory){
this.dataDirectory=dataDirectory;
}
/**
* @return the taskRunnerFactory
*/
public TaskRunnerFactory getTaskRunnerFactory(){
if( taskRunnerFactory == null ) {
taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000);
}
return taskRunnerFactory;
}
/**
* @param taskRunnerFactory the taskRunnerFactory to set
*/
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
this.taskRunnerFactory=taskRunnerFactory;
}
/**
* @return the journalThreadPriority
*/
public int getJournalThreadPriority(){
return this.journalThreadPriority;
}
/**
* @param journalThreadPriority the journalThreadPriority to set
*/
public void setJournalThreadPriority(int journalThreadPriority){
this.journalThreadPriority=journalThreadPriority;
}
/**
* @return the brokerName
*/
public String getBrokerName(){
return this.brokerName;
}
/**
* @param brokerName the brokerName to set
*/
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFactory;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
@ -483,7 +484,16 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
return new DefaultDatabaseLocker(getDataSource(), getStatements());
}
public void setBrokerName(String brokerName){
}
public String toString(){
return "JDBCPersistenceAdaptor("+super.toString()+")";
}
public void setDirectory(File dir){
}
public void checkpoint(boolean sync) throws IOException{
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.activemq.store.journal;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@ -314,6 +315,10 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
log.warn("Request to start checkpoint failed: " + e, e);
}
}
public void checkpoint(boolean sync) {
checkpoint(sync,sync);
}
/**
* This does the actual checkpoint.
@ -666,8 +671,15 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
public void setBrokerName(String brokerName){
longTermPersistence.setBrokerName(brokerName);
}
public String toString(){
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
}
public void setDirectory(File dir){
}
}

View File

@ -15,22 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
package org.apache.activemq.store.journal;
import java.io.File;
import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.JournalLockedException;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,11 +38,11 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.4 $
*/
public class DefaultPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
private static final Log log = LogFactory.getLog(DefaultPersistenceAdapterFactory.class);
private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
private int journalLogFileSize = 1024*1024*20;
private int journalLogFiles = 2;
@ -62,15 +61,8 @@ public class DefaultPersistenceAdapterFactory extends DataSourceSupport implemen
if( !useJournal ) {
return jdbcPersistenceAdapter;
}
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
// Setup the Journal
// if( useQuickJournal ) {
// return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
// } else {
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File(IOHelper.getDefaultStoreDirectory()));
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
//return new JournalPersistenceAdapter(getJournal(), adaptor, getTaskRunnerFactory());
// }
}
public int getJournalLogFiles() {

View File

@ -74,10 +74,15 @@ public class KahaMessageStore implements MessageStore{
removeMessage(ack.getLastMessageId());
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId);
if(messageContainer.isEmpty()){
resetBatching();
StoreEntry entry=messageContainer.getEntry(msgId);
if(entry!=null){
messageContainer.remove(entry);
if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
resetBatching();
}
}
}

View File

@ -41,38 +41,30 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class KahaPersistenceAdapter implements PersistenceAdapter{
private static final int STORE_LOCKED_WAIT_DELAY = 10*1000;
private static final int STORE_LOCKED_WAIT_DELAY=10*1000;
private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore;
ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
ConcurrentHashMap<ActiveMQQueue, MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
ConcurrentHashMap<ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,TopicMessageStore>();
ConcurrentHashMap<ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,MessageStore>();
ConcurrentHashMap<ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination,MessageStore>();
protected OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength=32*1024*1024;
private File dir;
private File directory;
private String brokerName;
private Store theStore;
public KahaPersistenceAdapter(File dir) throws IOException{
if(!dir.exists()){
dir.mkdirs();
}
this.dir=dir;
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
}
private boolean initialized;
public Set<ActiveMQDestination> getDestinations(){
Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
@ -81,7 +73,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add((ActiveMQDestination) obj);
rc.add((ActiveMQDestination)obj);
}
}
}catch(IOException e){
@ -127,25 +119,25 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
while (true) {
try {
Store store=getStore();
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load();
transactionStore=new KahaTransactionStore(this,container);
break;
}catch(StoreLockedExcpetion e) {
log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)+" seconds for the Store to be unlocked.");
while(true){
try{
Store store=getStore();
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load();
transactionStore=new KahaTransactionStore(this,container);
break;
}catch(StoreLockedExcpetion e){
log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)
+" seconds for the Store to be unlocked.");
try{
Thread.sleep(STORE_LOCKED_WAIT_DELAY);
}catch(InterruptedException e1){
}
}
}
}
}
return transactionStore;
}
@ -163,6 +155,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
public void start() throws Exception{
initialize();
}
public void stop() throws Exception{
@ -182,37 +175,37 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}else{
theStore.delete();
}
}else {
}else{
StoreFactory.delete(getStoreName());
}
}
protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, Message> container=store.getMapContainer(id,containerName);
MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new MessageMarshaller(wireFormat));
container.setValueMarshaller(new MessageMarshaller(wireFormat));
container.load();
return container;
}
protected MapContainer<String,Object> getSubsMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<String, Object> container=store.getMapContainer(id,containerName);
MapContainer<String,Object> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(Store.StringMarshaller);
container.setValueMarshaller(createMessageMarshaller());
container.setValueMarshaller(createMessageMarshaller());
container.load();
return container;
}
protected Marshaller<Object> createMessageMarshaller() {
return new CommandMarshaller(wireFormat);
}
protected Marshaller<Object> createMessageMarshaller(){
return new CommandMarshaller(wireFormat);
}
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
container.setMarshaller(createMessageMarshaller());
container.setMarshaller(createMessageMarshaller());
container.load();
return container;
}
@ -239,8 +232,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
this.maxDataFileLength=maxDataFileLength;
}
protected synchronized Store getStore() throws IOException{
if(theStore==null){
theStore=StoreFactory.open(getStoreName(),"rw");
@ -248,13 +239,50 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
return theStore;
}
private String getStoreName(){
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
return name;
initialize();
return directory.getAbsolutePath();
}
public String toString(){
return "KahaPersistenceAdapter("+getStoreName()+")";
}
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}
public String toString(){
return "KahaPersistenceAdapter(" + getStoreName() +")";
public String getBrokerName(){
return brokerName;
}
public File getDirectory(){
return this.directory;
}
public void setDirectory(File directory){
this.directory=directory;
}
public void checkpoint(boolean sync) throws IOException{
if(sync){
getStore().force();
}
}
private void initialize(){
if(!initialized){
initialized=true;
if(this.directory==null){
this.directory=new File(IOHelper.getDefaultDataDirectory());
this.directory=new File(this.directory,brokerName+"-kahastore");
}
this.directory.mkdirs();
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
}
}
}

View File

@ -77,7 +77,9 @@ public class KahaReferenceStore implements ReferenceStore{
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
if (entry != null) {
entry=messageContainer.getNext(entry);
}
}
if(entry!=null){
int count=0;
@ -120,11 +122,14 @@ public class KahaReferenceStore implements ReferenceStore{
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
ReferenceRecord rr=messageContainer.remove(msgId);
if(rr!=null){
removeInterest(rr);
if(messageContainer.isEmpty()){
resetBatching();
StoreEntry entry=messageContainer.getEntry(msgId);
if(entry!=null){
ReferenceRecord rr=messageContainer.remove(msgId);
if(rr!=null){
removeInterest(rr);
if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
resetBatching();
}
}
}
}

View File

@ -17,10 +17,8 @@
*/
package org.apache.activemq.store.kahadaptor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -29,7 +27,6 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageIdMarshaller;
@ -39,7 +36,6 @@ import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -51,10 +47,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
private boolean storeValid;
public KahaReferenceStoreAdapter(File dir) throws IOException {
super(dir);
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
throw new RuntimeException("Use createQueueReferenceStore instead");
}
@ -164,10 +157,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
}
public void sync() throws IOException {
getStore().force();
}
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);

View File

@ -119,8 +119,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
/*
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
@ -128,6 +129,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
container.add(ref);
}
}
*/
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{

View File

@ -17,7 +17,6 @@ package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -85,7 +84,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.add(ref);
StoreEntry listEntry = container.add(ref);
}
}
}
@ -118,8 +118,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
ConsumerMessageRef ref=container.remove();
if(container.isEmpty()){
@ -140,6 +140,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
removeInterest(rr);
}
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
@ -163,13 +164,15 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
/*
for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
ref.setMessageEntry(tsa.getMessageEntry());
container.add(ref);
}
*/
}
}
@ -186,7 +189,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.size();
return container != null ? container.size() : 0;
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
@ -226,6 +229,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){

View File

@ -54,14 +54,20 @@ import org.apache.activemq.kaha.StoreEntry;
return listContainer.isEmpty();
}
public void add(ConsumerMessageRef ref) {
listContainer.add(ref);
public StoreEntry add(ConsumerMessageRef ref) {
return listContainer.placeLast(ref);
}
public ConsumerMessageRef remove() {
ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst();
if (listContainer.isEmpty()) {
reset();
public ConsumerMessageRef remove(){
ConsumerMessageRef result=null;
if(!listContainer.isEmpty()){
StoreEntry entry=listContainer.getFirst();
if(entry!=null){
result=(ConsumerMessageRef)listContainer.removeFirst();
if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
reset();
}
}
}
return result;
}

View File

@ -158,4 +158,13 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
public String toString(){
return "MemoryPersistenceAdapter";
}
public void setBrokerName(String brokerName){
}
public void setDirectory(File dir){
}
public void checkpoint(boolean sync) throws IOException{
}
}

View File

@ -1,495 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.14 $
*/
public class QuickMessageStore implements MessageStore {
private static final Log log = LogFactory.getLog(QuickMessageStore.class);
protected final QuickPersistenceAdapter peristenceAdapter;
protected final QuickTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
protected Location lastLocation;
protected Location lastWrittenLocation;
protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private final AtomicReference<Location> mark = new AtomicReference<Location>();
public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
public boolean iterate() {
asyncWrite();
return false;
}}, "Checkpoint: "+destination);
}
public void setUsageManager(UsageManager usageManager) {
referenceStore.setUsageManager(usageManager);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final boolean debug = log.isDebugEnabled();
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message add for: "+id+", at: "+location);
addMessage(message, location);
} else {
if( debug )
log.debug("Journalled transacted message add for: "+id+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message add commit for: "+id+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
addMessage(message, location);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message add rollback for: "+id+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
private void addMessage(final Message message, final Location location) throws InterruptedIOException {
ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
synchronized (this) {
lastLocation = location;
messages.put(message.getMessageId(), data);
}
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
MessageId id = message.getMessageId();
try {
// Only add the message if it has not already been added.
ReferenceData data = referenceStore.getMessageReference(id);
if( data==null ) {
data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
return true;
}
}
catch (Throwable e) {
log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e);
}
return false;
}
/**
*/
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
removeMessage(ack, location);
} else {
if( debug )
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
ReferenceData data;
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
data = messages.remove(id);
if (data == null) {
messageAcks.add(ack);
}
}
if (data == null) {
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
if( t!=null ) {
referenceStore.removeMessage(context, messageAck);
return true;
}
}
catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
/**
* Waits till the lastest data has landed on the referenceStore
* @throws InterruptedIOException
*/
public void flush() throws InterruptedIOException {
log.debug("flush");
CountDownLatch countDown;
synchronized(this) {
if( lastWrittenLocation == lastLocation ) {
return;
}
if( flushLatch== null ) {
flushLatch = new CountDownLatch(1);
}
countDown = flushLatch;
}
try {
asyncWriteTask.wakeup();
countDown.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/**
* @return
* @throws IOException
*/
private void asyncWrite() {
try {
CountDownLatch countDown;
synchronized(this) {
countDown = flushLatch;
flushLatch = null;
}
mark.set(doAsyncWrite());
if ( countDown != null ) {
countDown.countDown();
}
} catch (IOException e) {
log.error("Checkpoint failed: "+e, e);
}
}
/**
* @return
* @throws IOException
*/
protected Location doAsyncWrite() throws IOException {
final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>();
lastLocation = this.lastLocation;
}
if( log.isDebugEnabled() )
log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "+cpRemovedMessageLocations.size()+" ");
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
while (iterator.hasNext()) {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
size ++;
// Commit the batch if it's getting too big
if( size >= maxCheckpointMessageAddSize ) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
size=0;
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
for (MessageAck ack : cpRemovedMessageLocations) {
try {
referenceStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
}
});
log.debug("Batch update done.");
synchronized (this) {
cpAddedMessageIds = null;
lastWrittenLocation = lastLocation;
}
if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
} else {
return lastLocation;
}
}
/**
*
*/
public Message getMessage(MessageId identity) throws IOException {
ReferenceData data=null;
synchronized (this) {
// Is it still in flight???
data = messages.get(identity);
if( data==null && cpAddedMessageIds!=null ) {
data = cpAddedMessageIds.get(identity);
}
}
if( data==null ) {
data = referenceStore.getMessageReference(identity);
if( data==null ) {
return null;
}
}
Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
DataStructure rc = peristenceAdapter.readCommand(location);
try {
return (Message) rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc);
}
}
/**
* Replays the referenceStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
flush();
referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
public void start() throws Exception {
referenceStore.start();
}
public void stop() throws Exception {
asyncWriteTask.shutdown();
referenceStore.stop();
}
/**
* @return Returns the longTermStore.
*/
public ReferenceStore getReferenceStore() {
return referenceStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException {
flush();
referenceStore.removeAllMessages(context);
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references.");
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException{
flush();
return referenceStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
flush();
referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener));
}
public void resetBatching(){
referenceStore.resetBatching();
}
public Location getMark() {
return mark.get();
}
}

View File

@ -1,679 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.store.quick.QuickTransactionStore.Tx;
import org.apache.activemq.store.quick.QuickTransactionStore.TxOperation;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListener {
private static final Log log = LogFactory.getLog(QuickPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, QuickMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, QuickMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, QuickMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, QuickMessageStore>();
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
private long cleanupInterval = 1000 * 60;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024*4;
private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private File directory = new File(IOHelper.getDefaultDataDirectory() + "/quick");
public synchronized void start() throws Exception {
if( !started.compareAndSet(false, true) )
return;
this.usageManager.addUsageListener(this);
if( asyncDataManager == null ) {
asyncDataManager = createAsyncDataManager();
}
if( referenceStoreAdapter==null ) {
referenceStoreAdapter = createReferenceStoreAdapter();
}
referenceStoreAdapter.setUsageManager(usageManager);
if( taskRunnerFactory==null ) {
taskRunnerFactory = createTaskRunnerFactory();
}
asyncDataManager.start();
if( deleteAllMessages ) {
asyncDataManager.delete();
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED "+new Date());
Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
asyncDataManager.setMark(location, true);
log.info("Journal deleted: ");
deleteAllMessages=false;
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
log.info("Active data files: "+files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() {
doCheckpoint();
return false;
}
}, "ActiveMQ Journal Checkpoint Worker");
createTransactionStore();
recover();
// Do a checkpoint periodically.
periodicCheckpointTask = new Runnable() {
public void run() {
checkpoint(false);
}
};
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
}
public void stop() throws Exception {
if( !started.compareAndSet(true, false) )
return;
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
QuickMessageStore ms = iterator.next();
ms.stop();
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
ms.stop();
}
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
checkpointTask.shutdown();
queues.clear();
topics.clear();
IOException firstException = null;
referenceStoreAdapter.stop();
try {
log.debug("Journal close");
asyncDataManager.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
if (firstException != null) {
throw firstException;
}
}
/**
* When we checkpoint we move all the journalled data to long term storage.
* @param stopping
*
* @param b
*/
public void checkpoint(boolean sync) {
try {
if (asyncDataManager == null )
throw new IllegalStateException("Journal is closed.");
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
}
checkpointTask.wakeup();
if (sync) {
log.debug("Waitng for checkpoint to complete.");
latch.await();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e);
}
}
/**
* This does the actual checkpoint.
* @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
}
try {
log.debug("Checkpoint started.");
Location newMark = null;
Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
final QuickMessageStore ms = iterator.next();
Location mark = (Location) ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
Location mark = (Location) ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
try {
if (newMark != null) {
log.debug("Marking journal at: " + newMark);
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT "+new Date(), true);
}
}
catch (Exception e) {
log.error("Failed to mark the Journal: " + e, e);
}
// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
// // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
// }
// }
log.debug("Checkpoint done.");
}
finally {
latch.countDown();
}
return true;
}
/**
* Cleans up the data files
* @return
* @throws IOException
*/
public void cleanup() {
try {
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
} catch (IOException e) {
log.error("Could not cleanup data files: "+e, e);
}
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue) destination);
}
else {
return createTopicMessageStore((ActiveMQTopic) destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
QuickMessageStore store = queues.get(destination);
if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new QuickMessageStore(this, checkpointStore, destination);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
queues.put(destination, store);
}
return store;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
QuickTopicMessageStore store = (QuickTopicMessageStore) topics.get(destinationName);
if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
topics.put(destinationName, store);
}
return store;
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(Location location) throws IOException {
try {
ByteSequence packet = asyncDataManager.read(location);
return (DataStructure) wireFormat.unmarshal(packet);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
Location pos = null;
int redoCounter = 0;
log.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
ByteSequence data = asyncDataManager.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
if (c instanceof Message ) {
Message message = (Message) c;
QuickMessageStore store = (QuickMessageStore) createMessageStore(message.getDestination());
if ( message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
}
else {
if( store.replayAddMessage(context, message, pos) ) {
redoCounter++;
}
}
} else {
switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE:
{
JournalQueueAck command = (JournalQueueAck) c;
QuickMessageStore store = (QuickMessageStore) createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
}
else {
if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
redoCounter++;
}
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE:
{
JournalTopicAck command = (JournalTopicAck) c;
QuickTopicMessageStore store = (QuickTopicMessageStore) createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
}
else {
if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
redoCounter++;
}
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE:
{
JournalTransaction command = (JournalTransaction) c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
redoCounter++;
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
redoCounter++;
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck) op.data;
if( ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
redoCounter++;
}
}
}
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
}
}
catch (IOException e) {
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace) c;
log.debug("TRACE Entry: " + trace.getMessage());
break;
default:
log.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
Location location = writeTraceMessage("RECOVERED "+new Date(), true);
asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
}
private IOException createReadException(Location location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
*
* @param command
* @param sync
* @return
* @throws IOException
*/
public Location writeCommand(DataStructure command, boolean sync) throws IOException {
return asyncDataManager.write(wireFormat.marshal(command), sync);
}
private Location writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
checkpoint(false);
}
}
public QuickTransactionStore getTransactionStore() {
return transactionStore;
}
public void deleteAllMessages() throws IOException {
deleteAllMessages=true;
}
public String toString(){
return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
}
///////////////////////////////////////////////////////////////////
// Subclass overridables
///////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager();
manager.setDirectory(new File(directory, "journal"));
return manager;
}
protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
return adaptor;
}
protected TaskRunnerFactory createTaskRunnerFactory() {
return DefaultThreadPools.getDefaultTaskRunnerFactory();
}
///////////////////////////////////////////////////////////////////
// Property Accessors
///////////////////////////////////////////////////////////////////
public AsyncDataManager getAsyncDataManager() {
return asyncDataManager;
}
public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
this.asyncDataManager = asyncDataManager;
}
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
public UsageManager getUsageManager() {
return usageManager;
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
this.maxCheckpointWorkers = maxCheckpointWorkers;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
}

View File

@ -1,211 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.13 $
*/
public class QuickTopicMessageStore extends QuickMessageStore implements TopicMessageStore {
private static final Log log = LogFactory.getLog(QuickTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
super(adapter, topicReferenceStore, destinationName);
this.topicReferenceStore = topicReferenceStore;
}
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
flush();
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{
flush();
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener));
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
flush();
topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
}
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
final Location location = peristenceAdapter.writeCommand(ack, false);
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
acknowledge(messageId, location, key);
} else {
if( debug )
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
synchronized (QuickTopicMessageStore.this) {
inFlightTxLocations.remove(location);
acknowledge(messageId, location, key);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
synchronized (QuickTopicMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
}
public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
try {
SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
if( sub != null ) {
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
return true;
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
/**
* @param messageId
* @param location
* @param key
* @throws InterruptedIOException
*/
private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
synchronized(this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
}
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
@Override
protected Location doAsyncWrite() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
Location location = super.doAsyncWrite();
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
}
}
} );
return location;
}
/**
* @return Returns the longTermStore.
*/
public TopicReferenceStore getTopicReferenceStore() {
return topicReferenceStore;
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
topicReferenceStore.deleteSubscription(clientId, subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return topicReferenceStore.getAllSubscriptions();
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
flush();
return topicReferenceStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName) {
topicReferenceStore.resetBatching(clientId,subscriptionName);
}
}

View File

@ -1,340 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
/**
*/
public class QuickTransactionStore implements TransactionStore {
private final QuickPersistenceAdapter peristenceAdapter;
Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public QuickMessageStore store;
public Object data;
public Location location;
public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
this.location=location;
}
}
/**
* Operations
* @version $Revision: 1.6 $
*/
public static class Tx {
private final Location location;
private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(Location location) {
this.location=location;
}
public void add(QuickMessageStore store, Message msg, Location location) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
}
public void add(QuickMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
}
public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList<TxOperation> getOperations() {
return operations;
}
}
public QuickTransactionStore(QuickPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
public Tx getTx(TransactionId txid,Location location){
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.get(txid);
}
if(tx==null){
tx=new Tx(location);
inflightTransactions.put(txid,tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx;
if(wasPrepared){
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
}
if(tx==null)
return;
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
true);
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
if(wasPrepared){
synchronized(preparedTransactions){
return preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
return inflightTransactions.remove(txid);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx!=null)
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
if(tx!=null){
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
true);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void replayRollback(TransactionId txid) throws IOException{
boolean inflight=false;
synchronized(inflightTransactions){
inflight=inflightTransactions.remove(txid)!=null;
}
if(inflight){
synchronized(preparedTransactions){
preparedTransactions.remove(txid);
}
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back..
synchronized(inflightTransactions){
inflightTransactions.clear();
}
this.doingRecover=true;
try{
Map<TransactionId, Tx> txs=null;
synchronized(preparedTransactions){
txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
}
for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
Object txid=iter.next();
Tx tx=txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
this.doingRecover=false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message, location);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(QuickMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public void acknowledge(QuickTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public Location checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
Location rc=null;
synchronized(inflightTransactions){
for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
}
synchronized(preparedTransactions){
for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
return rc;
}
}
public boolean isDoingRecover() {
return doingRecover;
}
}

View File

@ -1,59 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
final class RecoveryListenerAdapter implements MessageRecoveryListener {
static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store;
private final MessageRecoveryListener listener;
RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {
this.store = store;
this.listener = listener;
}
public void finished() {
listener.finished();
}
public boolean hasSpace() {
return listener.hasSpace();
}
public void recoverMessage(Message message) throws Exception {
listener.recoverMessage(message);
}
public void recoverMessageReference(MessageId ref) throws Exception {
Message message = this.store.getMessage(ref);
if( message !=null ){
listener.recoverMessage( message );
} else {
log.error("Message id "+ref+" could not be recovered from the data store!");
}
}
}

View File

@ -1,25 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
experimental store implementation
</body>
</html>

View File

@ -1,45 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.rapid;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
public class RapidMessageReference {
public final MessageId messageId;
public final Location location;
public RapidMessageReference(MessageId messageId, Location location) {
this.messageId = messageId;
this.location=location;
}
public RapidMessageReference(Message message, Location location) {
this.messageId = message.getMessageId();
this.location=location;
}
public MessageId getMessageId() {
return messageId;
}
public Location getLocation() {
return location;
}
}

View File

@ -1,46 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.Marshaller;
public class RapidMessageReferenceMarshaller implements Marshaller{
public Object readPayload(DataInput dataIn) throws IOException{
MessageId mid = new MessageId(dataIn.readUTF());
Location loc = new Location(dataIn.readInt(),dataIn.readInt());
RapidMessageReference rmr = new RapidMessageReference(mid,loc);
return rmr;
}
public void writePayload(Object object,DataOutput dataOut) throws IOException{
RapidMessageReference rmr = (RapidMessageReference)object;
dataOut.writeUTF(rmr.getMessageId().toString());
dataOut.writeInt(rmr.getLocation().getLogFileId());
dataOut.writeInt(rmr.getLocation().getLogFileOffset());
}
}

View File

@ -1,379 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.14 $
*/
public class RapidMessageStore implements MessageStore, UsageListener {
private static final Log log = LogFactory.getLog(RapidMessageStore.class);
protected final RapidPersistenceAdapter peristenceAdapter;
protected final RapidTransactionStore transactionStore;
protected final ListContainer messageContainer;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
protected final LRUCache cache;
protected UsageManager usageManager;
protected StoreEntry batchEntry = null;
protected Location lastLocation;
protected HashSet inFlightTxLocations = new HashSet();
public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, ListContainer container, int maximumCacheSize) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.messageContainer = container;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
// populate the cache
StoreEntry entry=messageContainer.getFirst();
int count = 0;
if(entry!=null){
do{
RapidMessageReference msg = (RapidMessageReference)messageContainer.get(entry);
cache.put(msg.getMessageId(),entry);
entry = messageContainer.getNext(entry);
count++;
}while(entry!=null && count < maximumCacheSize);
}
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public synchronized void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final boolean debug = log.isDebugEnabled();
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
final RapidMessageReference md = new RapidMessageReference(message, location);
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message add for: "+id+", at: "+location);
addMessage(md);
} else {
message.incrementReferenceCount();
if( debug )
log.debug("Journalled transacted message add for: "+id+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message add commit for: "+id+", at: "+location);
message.decrementReferenceCount();
synchronized( RapidMessageStore.this ) {
inFlightTxLocations.remove(location);
addMessage(md);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message add rollback for: "+id+", at: "+location);
message.decrementReferenceCount();
synchronized( RapidMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
private synchronized void addMessage(final RapidMessageReference messageReference){
StoreEntry item=messageContainer.placeLast(messageReference);
cache.put(messageReference.getMessageId(),item);
}
static protected String toString(Location location) {
Location l = (Location) location;
return l.getLogFileId()+":"+l.getLogFileOffset();
}
static protected Location toLocation(String t) {
String[] strings = t.split(":");
if( strings.length!=2 )
throw new IllegalArgumentException("Invalid location: "+t);
return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
}
public void replayAddMessage(ConnectionContext context, Message message, Location location) {
try {
RapidMessageReference messageReference = new RapidMessageReference(message, location);
addMessage(messageReference);
}
catch (Throwable e) {
log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
}
}
/**
*/
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
removeMessage(ack.getLastMessageId());
} else {
if( debug )
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
synchronized( RapidMessageStore.this ) {
inFlightTxLocations.remove(location);
removeMessage(ack.getLastMessageId());
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
synchronized( RapidMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
StoreEntry entry=(StoreEntry)cache.remove(msgId);
if(entry!=null){
entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
if(msg.getMessageId().equals(msgId)){
messageContainer.remove(entry);
break;
}
}
}
if (messageContainer.isEmpty()) {
resetBatching();
}
}
public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
try {
MessageId id = ack.getLastMessageId();
removeMessage(id);
}
catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
}
public synchronized Message getMessage(MessageId identity) throws IOException{
RapidMessageReference result=null;
StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (RapidMessageReference)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
if(msg.getMessageId().equals(identity)){
result=msg;
cache.put(identity,entry);
break;
}
}
}
if (result == null )
return null;
return (Message) peristenceAdapter.readCommand(result.getLocation());
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws Exception
*/
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
RapidMessageReference messageReference=(RapidMessageReference) iter.next();
Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m);
}
listener.finished();
}
public void start() {
if( this.usageManager != null )
this.usageManager.addUsageListener(this);
}
public void stop() {
if( this.usageManager != null )
this.usageManager.removeUsageListener(this);
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
messageContainer.clear();
cache.clear();
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("Does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("Does not support message references.");
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
/**
* @return
* @throws IOException
*/
public Location checkpoint() throws IOException {
ArrayList cpActiveJournalLocations;
// swap out the message hash maps..
synchronized (this) {
cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
}
if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations);
return (Location) cpActiveJournalLocations.get(0);
} else {
return lastLocation;
}
}
public int getMessageCount(){
return messageContainer.size();
}
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry=batchEntry;
if(entry==null){
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
}
if(entry!=null){
int count=0;
do{
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(entry);
Message msg=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
if(msg!=null){
Message message=(Message)msg;
listener.recoverMessage(message);
count++;
}
batchEntry=entry;
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
public void resetBatching(){
batchEntry = null;
}
/**
* @return true if the store supports cursors
*/
public boolean isSupportForCursors() {
return true;
}
public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if (newPercentUsage == 100) {
cache.clear();
}
}
}

View File

@ -1,656 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.Location;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller;
import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
* asynchronously on a timeout with some other long term persistent storage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class RapidPersistenceAdapter implements PersistenceAdapter,JournalEventListener,UsageListener{
private static final Log log=LogFactory.getLog(RapidPersistenceAdapter.class);
private final Journal journal;
private final WireFormat wireFormat=new OpenWireFormat();
private final ConcurrentHashMap queues=new ConcurrentHashMap();
private final ConcurrentHashMap topics=new ConcurrentHashMap();
private long checkpointInterval=1000*60*5;
private long lastCheckpointRequest=System.currentTimeMillis();
private int maxCheckpointWorkers=10;
private int maxCheckpointMessageAddSize=5000;
private RapidTransactionStore transactionStore=new RapidTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
private boolean fullCheckPoint;
private AtomicBoolean started=new AtomicBoolean(false);
Store store;
private boolean useExternalMessageReferences;
private final Runnable periodicCheckpointTask=createPeriodicCheckpointTask();
private int maximumDestinationCacheSize=2000;
final Runnable createPeriodicCheckpointTask(){
return new Runnable(){
public void run(){
if(System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval){
checkpoint(false,true);
}
}
};
}
public RapidPersistenceAdapter(Journal journal,TaskRunnerFactory taskRunnerFactory) throws IOException{
this.journal=journal;
journal.setJournalEventListener(this);
File dir=((JournalImpl)journal).getLogDirectory();
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw");
checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate(){
return doCheckpoint();
}
},"ActiveMQ Checkpoint Worker");
}
public Set getDestinations(){
Set rc=new HashSet();
try{
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
}
}
}catch(IOException e){
log.error("Failed to get destinations ",e);
}
return rc;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
if(destination.isQueue()){
return createQueueMessageStore((ActiveMQQueue)destination);
}else{
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
RapidMessageStore store=(RapidMessageStore)queues.get(destination);
if(store==null){
ListContainer messageContainer=getListContainer(destination,"topic-data");
store=new RapidMessageStore(this,destination,messageContainer,maximumDestinationCacheSize);
queues.put(destination,store);
}
return store;
}
protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
MapContainer container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
if(useExternalMessageReferences){
container.setValueMarshaller(new StringMarshaller());
}else{
container.setValueMarshaller(new CommandMarshaller(wireFormat));
}
container.load();
return container;
}
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
container.setMarshaller(new RapidMessageReferenceMarshaller());
container.load();
return container;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore)topics.get(destination);
if(rc==null){
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new RapidTopicMessageStore(this,store,messageContainer,ackContainer,subsContainer,destination,
maximumDestinationCacheSize);
topics.put(destination,rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException{
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException{
// TODO: implement this.
return 0;
}
public void beginTransaction(ConnectionContext context) throws IOException{
}
public void commitTransaction(ConnectionContext context) throws IOException{
}
public void rollbackTransaction(ConnectionContext context) throws IOException{
}
public synchronized void start() throws Exception{
if(!started.compareAndSet(false,true))
return;
checkpointExecutor=new ThreadPoolExecutor(maxCheckpointWorkers,maxCheckpointWorkers,30,TimeUnit.SECONDS,
new LinkedBlockingQueue(),new ThreadFactory(){
public Thread newThread(Runnable runable){
Thread t=new Thread(runable,"Journal checkpoint worker");
t.setPriority(7);
return t;
}
});
// checkpointExecutor.allowCoreThreadTimeOut(true);
createTransactionStore();
recover();
// Do a checkpoint periodically.
Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval/10);
}
public void stop() throws Exception{
if(!started.compareAndSet(true,false))
return;
Scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing.
checkpoint(false,true);
checkpointTask.shutdown();
checkpointExecutor.shutdown();
queues.clear();
topics.clear();
IOException firstException=null;
try{
journal.close();
}catch(Exception e){
firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
}
store.close();
if(firstException!=null){
throw firstException;
}
}
// Properties
// -------------------------------------------------------------------------
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat(){
return wireFormat;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
* The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this
* for us.
*
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/
public void overflowNotification(RecordLocation safeLocation){
checkpoint(false,true);
}
/**
* When we checkpoint we move all the journalled data to long term storage.
*
* @param stopping
*
* @param b
*/
public void checkpoint(boolean sync,boolean fullCheckpoint){
try{
if(journal==null)
throw new IllegalStateException("Journal is closed.");
long now=System.currentTimeMillis();
CountDownLatch latch=null;
synchronized(this){
latch=nextCheckpointCountDownLatch;
lastCheckpointRequest=now;
if(fullCheckpoint){
this.fullCheckPoint=true;
}
}
checkpointTask.wakeup();
if(sync){
log.debug("Waking for checkpoint to complete.");
latch.await();
}
}catch(InterruptedException e){
log.warn("Request to start checkpoint failed: "+e,e);
}
}
/**
* This does the actual checkpoint.
*
* @return
*/
public boolean doCheckpoint(){
CountDownLatch latch=null;
boolean fullCheckpoint;
synchronized(this){
latch=nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch=new CountDownLatch(1);
fullCheckpoint=this.fullCheckPoint;
this.fullCheckPoint=false;
}
try{
log.debug("Checkpoint started.");
RecordLocation newMark=null;
ArrayList futureTasks=new ArrayList(queues.size()+topics.size());
//
// We do many partial checkpoints (fullCheckpoint==false) to move topic messages
// to long term store as soon as possible.
//
// We want to avoid doing that for queue messages since removes the come in the same
// checkpoint cycle will nullify the previous message add. Therefore, we only
// checkpoint queues on the fullCheckpoint cycles.
//
if(fullCheckpoint){
Iterator iterator=queues.values().iterator();
while(iterator.hasNext()){
try{
final RapidMessageStore ms=(RapidMessageStore)iterator.next();
FutureTask task=new FutureTask(new Callable(){
public Object call() throws Exception{
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
}catch(Exception e){
log.error("Failed to checkpoint a message store: "+e,e);
}
}
}
Iterator iterator=topics.values().iterator();
while(iterator.hasNext()){
try{
final RapidTopicMessageStore ms=(RapidTopicMessageStore)iterator.next();
FutureTask task=new FutureTask(new Callable(){
public Object call() throws Exception{
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
}catch(Exception e){
log.error("Failed to checkpoint a message store: "+e,e);
}
}
try{
for(Iterator iter=futureTasks.iterator();iter.hasNext();){
FutureTask ft=(FutureTask)iter.next();
RecordLocation mark=(RecordLocation)ft.get();
// We only set a newMark on full checkpoints.
if(fullCheckpoint){
if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
newMark=mark;
}
}
}
}catch(Throwable e){
log.error("Failed to checkpoint a message store: "+e,e);
}
if(fullCheckpoint){
try{
if(newMark!=null){
log.debug("Marking journal at: "+newMark);
journal.setMark(newMark,true);
}
}catch(Exception e){
log.error("Failed to mark the Journal: "+e,e);
}
// TODO: do we need to implement a periodic clean up?
// if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
// }
// }
}
log.debug("Checkpoint done.");
}finally{
latch.countDown();
}
synchronized(this){
return this.fullCheckPoint;
}
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(RecordLocation location) throws IOException{
try{
Packet data=journal.read(location);
return (DataStructure)wireFormat.unmarshal(toByteSequence(data));
}catch(InvalidRecordLocationException e){
throw createReadException(location,e);
}catch(IOException e){
throw createReadException(location,e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidRecordLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException,InvalidRecordLocationException,IOException,IOException{
Location pos=null;
int transactionCounter=0;
log.info("Journal Recovery Started.");
ConnectionContext context=new ConnectionContext();
// While we have records in the journal.
while((pos=(Location)journal.getNextRecordLocation(pos))!=null){
Packet data=journal.read(pos);
DataStructure c=(DataStructure)wireFormat.unmarshal(toByteSequence(data));
if(c instanceof Message){
Message message=(Message)c;
RapidMessageStore store=(RapidMessageStore)createMessageStore(message.getDestination());
if(message.isInTransaction()){
transactionStore.addMessage(store,message,pos);
}else{
store.replayAddMessage(context,message,pos);
transactionCounter++;
}
}else{
switch(c.getDataStructureType()){
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command=(JournalQueueAck)c;
RapidMessageStore store=(RapidMessageStore)createMessageStore(command.getDestination());
if(command.getMessageAck().isInTransaction()){
transactionStore.removeMessage(store,command.getMessageAck(),pos);
}else{
store.replayRemoveMessage(context,command.getMessageAck());
transactionCounter++;
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: {
JournalTopicAck command=(JournalTopicAck)c;
RapidTopicMessageStore store=(RapidTopicMessageStore)createMessageStore(command.getDestination());
if(command.getTransactionId()!=null){
transactionStore.acknowledge(store,command,pos);
}else{
store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
.getMessageId());
transactionCounter++;
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE: {
JournalTransaction command=(JournalTransaction)c;
try{
// Try to replay the packet.
switch(command.getType()){
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
if(tx==null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
TxOperation op=(TxOperation)iter.next();
if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
op.store.replayAddMessage(context,(Message)op.data,op.location);
}
if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
op.store.replayRemoveMessage(context,(MessageAck)op.data);
}
if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
JournalTopicAck ack=(JournalTopicAck)op.data;
((RapidTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
.getSubscritionName(),ack.getMessageId());
}
}
transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
}
}catch(IOException e){
log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace=(JournalTrace)c;
log.debug("TRACE Entry: "+trace.getMessage());
break;
default:
log.error("Unknown type of record in transaction log which will be discarded: "+c);
}
}
}
RecordLocation location=writeTraceMessage("RECOVERED",true);
journal.setMark(location,true);
log.info("Journal Recovered: "+transactionCounter+" message(s) in transactions recovered.");
}
private IOException createReadException(RecordLocation location,Exception e){
return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
}
protected IOException createWriteException(DataStructure packet,Exception e){
return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
}
protected IOException createWriteException(String command,Exception e){
return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
}
protected IOException createRecoveryFailedException(Exception e){
return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
}
/**
*
* @param command
* @param sync
* @return
* @throws IOException
*/
public Location writeCommand(DataStructure command,boolean sync) throws IOException{
if(started.get())
return (Location)journal.write(toPacket(wireFormat.marshal(command)),sync);
throw new IOException("closed");
}
private RecordLocation writeTraceMessage(String message,boolean sync) throws IOException{
JournalTrace trace=new JournalTrace();
trace.setMessage(message);
return writeCommand(trace,sync);
}
public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if(newPercentUsage>80&&oldPercentUsage<newPercentUsage){
checkpoint(false,true);
}
}
public RapidTransactionStore getTransactionStore(){
return transactionStore;
}
public void deleteAllMessages() throws IOException{
try{
JournalTrace trace=new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location=journal.write(toPacket(wireFormat.marshal(trace)),false);
journal.setMark(location,true);
log.info("Journal deleted: ");
}catch(IOException e){
throw e;
}catch(Throwable e){
throw IOExceptionSupport.create(e);
}
if(store!=null){
if(store.isInitialized()){
store.clear();
}else{
store.delete();
}
}
}
public int getMaxCheckpointMessageAddSize(){
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers(){
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
this.maxCheckpointWorkers=maxCheckpointWorkers;
}
public boolean isUseExternalMessageReferences(){
return false;
}
public void setUseExternalMessageReferences(boolean enable){
if(enable)
throw new IllegalArgumentException("The journal does not support message references.");
}
public void setUsageManager(UsageManager usageManager){
}
public Store getStore(){
return store;
}
/**
* @return the maximumDestinationCacheSize
*/
public int getMaximumDestinationCacheSize(){
return this.maximumDestinationCacheSize;
}
/**
* @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
*/
public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
this.maximumDestinationCacheSize=maximumDestinationCacheSize;
}
public Packet toPacket(ByteSequence sequence){
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data,sequence.offset,
sequence.length));
}
public ByteSequence toByteSequence(Packet packet){
org.apache.activeio.packet.ByteSequence sequence=packet.asByteSequence();
return new ByteSequence(sequence.getData(),sequence.getOffset(),sequence.getLength());
}
}

View File

@ -1,319 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadaptor.ConsumerMessageRef;
import org.apache.activemq.store.kahadaptor.ConsumerMessageRefMarshaller;
import org.apache.activemq.store.kahadaptor.TopicSubAck;
import org.apache.activemq.store.kahadaptor.TopicSubContainer;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.13 $
*/
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
private ListContainer ackContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberMessages=new ConcurrentHashMap();
public RapidTopicMessageStore(RapidPersistenceAdapter adapter, Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
super(adapter,destination,messageContainer,maximumCacheSize);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberMessageContainer(key);
}
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
final RapidMessageReference md = new RapidMessageReference(message, location);
StoreEntry messageEntry=messageContainer.placeLast(md);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
StoreEntry ackEntry=ackContainer.placeLast(tsa);
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.add(ref);
}
}
}
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
ref.setMessageEntry(tsa.getMessageEntry());
container.add(ref);
}
}
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
removeSubscriberMessageContainer(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
.getMessageEntry());
if(messageReference!=null){
Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m);
}
}
}
listener.finished();
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getEntry();
}else{
entry=container.refreshEntry(entry);
entry=container.getNextEntry(entry);
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=container.get(entry);
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
.getMessageEntry());
if(messageReference!=null){
Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m);
count++;
}
container.setBatchEntry(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned && listener.hasSpace());
}
}
listener.finished();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
subscriberMessages.put(key,tsc);
return container;
}
protected void removeSubscriberMessageContainer(Object key) throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
store.deleteListContainer(key,"topic-subs");
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.size();
}
/**
* @param context
* @param messageId
* @param expirationTime
* @param messageRef
* @throws IOException
* @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
* org.apache.activemq.command.MessageId, long, java.lang.String)
*/
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
throw new IOException("Not supported");
}
/**
* @param identity
* @return String
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
*/
public String getMessageReference(MessageId identity) throws IOException{
return null;
}
/**
* @param context
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
*/
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
container.clear();
}
}
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
if(topicSubContainer!=null){
topicSubContainer.reset();
}
}
public Location checkpoint() throws IOException{
return null;
}
public synchronized void replayAcknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId){
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
}
}

View File

@ -1,303 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
/**
*/
public class RapidTransactionStore implements TransactionStore {
private final RapidPersistenceAdapter peristenceAdapter;
ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public RapidMessageStore store;
public Object data;
public Location location;
public TxOperation(byte operationType, RapidMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
this.location = location;
}
}
/**
* Operations
* @version $Revision: 1.6 $
*/
public static class Tx {
private final Location location;
private ArrayList operations = new ArrayList();
public Tx(Location location) {
this.location=location;
}
public void add(RapidMessageStore store, Message msg, Location loc) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
}
public void add(RapidMessageStore store, MessageAck ack, Location loc) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc));
}
public void add(RapidTopicMessageStore store, JournalTopicAck ack, Location loc) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
}
public Message[] getMessages() {
ArrayList list = new ArrayList();
for (Iterator iter = operations.iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList list = new ArrayList();
for (Iterator iter = operations.iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList getOperations() {
return operations;
}
}
public RapidTransactionStore(RapidPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException {
Tx tx = (Tx) inflightTransactions.remove(txid);
if (tx == null)
return;
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
preparedTransactions.put(txid, tx);
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException {
Tx tx = (Tx) inflightTransactions.remove(txid);
if (tx == null)
return;
preparedTransactions.put(txid, tx);
}
public Tx getTx(Object txid, Location location) {
Tx tx = (Tx) inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx(location);
inflightTransactions.put(txid, tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
Tx tx;
if (wasPrepared) {
tx = (Tx) preparedTransactions.remove(txid);
} else {
tx = (Tx) inflightTransactions.remove(txid);
}
if (tx == null)
return;
if (txid.isXATransaction()) {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
true);
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
true);
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
if (wasPrepared) {
return (Tx) preparedTransactions.remove(txid);
} else {
return (Tx) inflightTransactions.remove(txid);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
Tx tx = (Tx) inflightTransactions.remove(txid);
if (tx != null)
tx = (Tx) preparedTransactions.remove(txid);
if (tx != null) {
if (txid.isXATransaction()) {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
true);
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
true);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void replayRollback(TransactionId txid) throws IOException {
if (inflightTransactions.remove(txid) != null)
preparedTransactions.remove(txid);
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the in-flight transactions get rolled back..
inflightTransactions.clear();
this.doingRecover = true;
try {
for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
Object txid = (Object) iter.next();
Tx tx = (Tx) preparedTransactions.get(txid);
listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
}
} finally {
this.doingRecover = false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(RapidMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message, location);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(RapidMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack, location);
}
public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack, location);
}
public Location checkpoint() throws IOException {
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
Location rc = null;
for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
Tx tx = (Tx) iter.next();
Location location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}
}
for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
Tx tx = (Tx) iter.next();
Location location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}
}
return rc;
}
public boolean isDoingRecover() {
return doingRecover;
}
}

View File

@ -1,25 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
experimental store implementation
</body>
</html>