diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryMessageStore.java deleted file mode 100755 index 0f53ff12fc..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryMessageStore.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.memory; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; - -/** - * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a - * - * @version $Revision: 1.7 $ - */ -public class MemoryMessageStore implements MessageStore { - - protected final ActiveMQDestination destination; - protected final Map messageTable; - - public MemoryMessageStore(ActiveMQDestination destination) { - this(destination, new LinkedHashMap()); - } - - public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) { - this.destination = destination; - this.messageTable = Collections.synchronizedMap(messageTable); - } - - public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { - messageTable.put(message.getMessageId(), message); - } - public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { - messageTable.put(messageId, messageRef); - } - - public Message getMessage(MessageId identity) throws IOException { - return (Message) messageTable.get(identity); - } - public String getMessageReference(MessageId identity) throws IOException { - return (String) messageTable.get(identity); - } - - public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - messageTable.remove(ack.getLastMessageId()); - } - - public void removeMessage(MessageId msgId) throws IOException { - messageTable.remove(msgId); - } - - public void recover(MessageRecoveryListener listener) throws Exception { - // the message table is a synchronizedMap - so just have to synchronize here - synchronized(messageTable){ - for(Iterator iter=messageTable.values().iterator();iter.hasNext();){ - Object msg=(Object) iter.next(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String) msg); - }else{ - listener.recoverMessage((Message) msg); - } - } - listener.finished(); - } - } - - public void start() throws IOException { - } - - public void stop(long timeout) throws IOException { - } - - public void removeAllMessages(ConnectionContext context) throws IOException { - messageTable.clear(); - } - - public ActiveMQDestination getDestination() { - return destination; - } - - public void delete() { - messageTable.clear(); - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryPersistenceAdapter.java deleted file mode 100755 index e63b234aaf..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryPersistenceAdapter.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.memory; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionStore; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; - -/** - * @org.apache.xbean.XBean - * - * @version $Revision: 1.4 $ - */ -public class MemoryPersistenceAdapter implements PersistenceAdapter { - private static final Log log = LogFactory.getLog(MemoryPersistenceAdapter.class); - - MemoryTransactionStore transactionStore; - ConcurrentHashMap topics = new ConcurrentHashMap(); - ConcurrentHashMap queues = new ConcurrentHashMap(); - private boolean useExternalMessageReferences; - - public Set getDestinations() { - Set rc = new HashSet(queues.size()+topics.size()); - for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) { - rc.add( iter.next() ); - } - for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) { - rc.add( iter.next() ); - } - return rc; - } - - public static MemoryPersistenceAdapter newInstance(File file) { - return new MemoryPersistenceAdapter(); - } - - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - MessageStore rc = (MessageStore)queues.get(destination); - if(rc==null) { - rc = new MemoryMessageStore(destination); - if( transactionStore !=null ) { - rc = transactionStore.proxy(rc); - } - queues.put(destination, rc); - } - return rc; - } - - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - TopicMessageStore rc = (TopicMessageStore)topics.get(destination); - if(rc==null) { - rc = new MemoryTopicMessageStore(destination); - if( transactionStore !=null ) { - rc = transactionStore.proxy(rc); - } - topics.put(destination, rc); - } - return rc; - } - - public TransactionStore createTransactionStore() throws IOException { - if( transactionStore==null ) { - transactionStore = new MemoryTransactionStore(); - } - return transactionStore; - } - - public void beginTransaction(ConnectionContext context) { - } - - public void commitTransaction(ConnectionContext context) { - } - - public void rollbackTransaction(ConnectionContext context) { - } - - public void start() throws Exception { - } - - public void stop() throws Exception { - } - - public long getLastMessageBrokerSequenceId() throws IOException { - return 0; - } - - public void deleteAllMessages() throws IOException { - for (Iterator iter = topics.values().iterator(); iter.hasNext();) { - MemoryMessageStore store = asMemoryMessageStore(iter.next()); - if (store != null) { - store.delete(); - } - } - for (Iterator iter = queues.values().iterator(); iter.hasNext();) { - MemoryMessageStore store = asMemoryMessageStore(iter.next()); - if (store != null) { - store.delete(); - } - } - - if (transactionStore != null) { - transactionStore.delete(); - } - } - - public boolean isUseExternalMessageReferences() { - return useExternalMessageReferences; - } - - public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { - this.useExternalMessageReferences = useExternalMessageReferences; - } - - protected MemoryMessageStore asMemoryMessageStore(Object value) { - if (value instanceof MemoryMessageStore) { - return (MemoryMessageStore) value; - } - log.warn("Expected an instance of MemoryMessageStore but was: " + value); - return null; - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTopicMessageStore.java deleted file mode 100755 index 8edc30a32b..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTopicMessageStore.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.memory; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.util.SubscriptionKey; - -/** - * @version $Revision: 1.5 $ - */ -public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { - - private Map ackDatabase; - private Map subscriberDatabase; - MessageId lastMessageId; - - public MemoryTopicMessageStore(ActiveMQDestination destination) { - this(destination, new LinkedHashMap(), makeMap(), makeMap()); - } - protected static Map makeMap() { - return Collections.synchronizedMap(new HashMap()); - } - - public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) { - super(destination, messageTable); - this.subscriberDatabase = subscriberDatabase; - this.ackDatabase = ackDatabase; - } - - public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { - super.addMessage(context, message); - lastMessageId = message.getMessageId(); - } - - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { - ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId); - } - - public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { - return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); - } - - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { - SubscriptionInfo info = new SubscriptionInfo(); - info.setDestination(destination); - info.setClientId(clientId); - info.setSelector(selector); - info.setSubcriptionName(subscriptionName); - SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); - subscriberDatabase.put(key, info); - MessageId l=retroactive ? null : lastMessageId; - if( l!=null ) { - ackDatabase.put(key, l); - } - } - - public void deleteSubscription(String clientId, String subscriptionName) { - org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); - ackDatabase.remove(key); - subscriberDatabase.remove(key); - } - - public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) - throws Exception{ - MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName)); - boolean pastLastAck=lastAck==null; - // the message table is a synchronizedMap - so just have to synchronize here - synchronized(messageTable){ - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ - Map.Entry entry=(Entry) iter.next(); - if(pastLastAck){ - Object msg=entry.getValue(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String) msg); - }else{ - listener.recoverMessage((Message) msg); - } - }else{ - pastLastAck=entry.getKey().equals(lastAck); - } - } - listener.finished(); - } - } - - public void delete() { - super.delete(); - ackDatabase.clear(); - subscriberDatabase.clear(); - lastMessageId=null; - } - - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTransactionStore.java deleted file mode 100755 index 08cfcb574d..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MemoryTransactionStore.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.memory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; - -import javax.transaction.xa.XAException; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.ProxyMessageStore; -import org.apache.activemq.store.ProxyTopicMessageStore; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionRecoveryListener; -import org.apache.activemq.store.TransactionStore; - -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; - -/** - * Provides a TransactionStore implementation that can create transaction aware - * MessageStore objects from non transaction aware MessageStore objects. - * - * @version $Revision: 1.4 $ - */ -public class MemoryTransactionStore implements TransactionStore { - - ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); - - ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); - - private boolean doingRecover; - - public static class Tx { - private ArrayList messages = new ArrayList(); - - private ArrayList acks = new ArrayList(); - - public void add(AddMessageCommand msg) { - messages.add(msg); - } - - public void add(RemoveMessageCommand ack) { - acks.add(ack); - } - - public Message[] getMessages() { - Message rc[] = new Message[messages.size()]; - int count=0; - for (Iterator iter = messages.iterator(); iter.hasNext();) { - AddMessageCommand cmd = (AddMessageCommand) iter.next(); - rc[count++] = cmd.getMessage(); - } - return rc; - } - - public MessageAck[] getAcks() { - MessageAck rc[] = new MessageAck[acks.size()]; - int count=0; - for (Iterator iter = acks.iterator(); iter.hasNext();) { - RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); - rc[count++] = cmd.getMessageAck(); - } - return rc; - } - - /** - * @throws IOException - */ - public void commit() throws IOException { - // Do all the message adds. - for (Iterator iter = messages.iterator(); iter.hasNext();) { - AddMessageCommand cmd = (AddMessageCommand) iter.next(); - cmd.run(); - } - // And removes.. - for (Iterator iter = acks.iterator(); iter.hasNext();) { - RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); - cmd.run(); - } - } - } - - public interface AddMessageCommand { - Message getMessage(); - void run() throws IOException; - } - - public interface RemoveMessageCommand { - MessageAck getMessageAck(); - void run() throws IOException; - } - - public MessageStore proxy(MessageStore messageStore) { - return new ProxyMessageStore(messageStore) { - public void addMessage(ConnectionContext context, final Message send) throws IOException { - MemoryTransactionStore.this.addMessage(getDelegate(), send); - } - - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { - MemoryTransactionStore.this.removeMessage(getDelegate(), ack); - } - }; - } - - public TopicMessageStore proxy(TopicMessageStore messageStore) { - return new ProxyTopicMessageStore(messageStore) { - public void addMessage(ConnectionContext context, final Message send) throws IOException { - MemoryTransactionStore.this.addMessage(getDelegate(), send); - } - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { - MemoryTransactionStore.this.removeMessage(getDelegate(), ack); - } - }; - } - - /** - * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) - */ - public void prepare(TransactionId txid) { - Tx tx = (Tx) inflightTransactions.remove(txid); - if (tx == null) - return; - preparedTransactions.put(txid, tx); - } - - public Tx getTx(Object txid) { - Tx tx = (Tx) inflightTransactions.get(txid); - if (tx == null) { - tx = new Tx(); - inflightTransactions.put(txid, tx); - } - return tx; - } - - /** - * @throws XAException - * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) - */ - public void commit(TransactionId txid, boolean wasPrepared) throws IOException { - - Tx tx; - if( wasPrepared ) { - tx = (Tx) preparedTransactions.remove(txid); - } else { - tx = (Tx) inflightTransactions.remove(txid); - } - - if( tx == null ) - return; - tx.commit(); - - } - - /** - * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) - */ - public void rollback(TransactionId txid) { - preparedTransactions.remove(txid); - inflightTransactions.remove(txid); - } - - public void start() throws Exception { - } - - public void stop() throws Exception { - } - - synchronized public void recover(TransactionRecoveryListener listener) throws IOException { - // All the inflight transactions get rolled back.. - inflightTransactions.clear(); - this.doingRecover = true; - try { - for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { - Object txid = (Object) iter.next(); - Tx tx = (Tx) preparedTransactions.get(txid); - listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); - } - } finally { - this.doingRecover = false; - } - } - - /** - * @param message - * @throws IOException - */ - void addMessage(final MessageStore destination, final Message message) throws IOException { - - if( doingRecover ) - return; - - if (message.getTransactionId()!=null) { - Tx tx = getTx(message.getTransactionId()); - tx.add(new AddMessageCommand() { - public Message getMessage() { - return message; - } - public void run() throws IOException { - destination.addMessage(null, message); - } - }); - } else { - destination.addMessage(null, message); - } - } - - /** - * @param ack - * @throws IOException - */ - private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { - if( doingRecover ) - return; - - if (ack.isInTransaction()) { - Tx tx = getTx(ack.getTransactionId()); - tx.add(new RemoveMessageCommand() { - public MessageAck getMessageAck() { - return ack; - } - public void run() throws IOException { - destination.removeMessage(null, ack); - } - }); - } else { - destination.removeMessage(null, ack); - } - } - - public void delete() { - inflightTransactions.clear(); - preparedTransactions.clear(); - doingRecover=false; - } - -}