git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1296469 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-02 21:11:30 +00:00
parent ac54a611f3
commit d40f398b31
8 changed files with 229 additions and 94 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -35,75 +36,87 @@ abstract public class AbstractMessageStore implements MessageStore {
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
}
@Override
public void dispose(ConnectionContext context) {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public ActiveMQDestination getDestination() {
return destination;
}
@Override
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
@Override
public void setBatch(MessageId messageId) throws IOException, Exception {
}
/**
* flag to indicate if the store is empty
*
*
* @return true if the message count is 0
* @throws Exception
*/
@Override
public boolean isEmpty() throws Exception {
return getMessageCount() == 0;
}
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
}
@Override
public boolean isPrioritizedMessages() {
return this.prioritizedMessages;
}
public void addMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException{
addMessage(context,message);
@Override
public void addMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException{
addMessage(context, message);
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
addMessage(context, message,canOptimizeHint);
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
addMessage(context, message,canOptimizeHint);
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(context, ack);
}
static class CallableImplementation implements Callable<Object> {
public Object call() throws Exception {
return null;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -28,14 +29,14 @@ import org.apache.activemq.usage.MemoryUsage;
/**
* Represents a message store which is used by the persistent implementations
*
*
*
*
*/
public interface MessageStore extends Service {
/**
* Adds a message to the message store
*
*
* @param context context
* @param message
* @throws IOException
@ -50,15 +51,15 @@ public interface MessageStore extends Service {
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
* @throws IOException
*/
void addMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
*
*
* @param context context
* @param message
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
@ -73,20 +74,20 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
*
*
* @param context context
* @param message
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/**
/**
* Adds a message to the message store
*
* @param context context
@ -96,14 +97,13 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Looks up a message using either the String messageID or the
* messageNumber. Implementations are encouraged to fill in the missing key
* if its easy to do so.
*
*
* @param identity which contains either the messageID or the messageNumber
* @return the message or null if it does not exist
* @throws IOException
@ -112,7 +112,7 @@ public interface MessageStore extends Service {
/**
* Removes a message from the message store.
*
*
* @param context
* @param ack the ack request that cause the message to be removed. It
* conatins the identity which contains the messageID of the
@ -120,12 +120,12 @@ public interface MessageStore extends Service {
* @throws IOException
*/
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException;
/**
* Removes all the messages from the message store.
*
*
* @param context
* @throws IOException
*/
@ -133,7 +133,7 @@ public interface MessageStore extends Service {
/**
* Recover any messages to be delivered.
*
*
* @param container
* @throws Exception
*/
@ -141,7 +141,7 @@ public interface MessageStore extends Service {
/**
* The destination that the message store is holding messages for.
*
*
* @return the destination
*/
ActiveMQDestination getDestination();
@ -155,13 +155,13 @@ public interface MessageStore extends Service {
/**
* @return the number of messages ready to deliver
* @throws IOException
*
*
*/
int getMessageCount() throws IOException;
/**
* A hint to the Store to reset any batching state for the Destination
*
*
*/
void resetBatching();
@ -172,27 +172,27 @@ public interface MessageStore extends Service {
/**
* allow caching cursors to set the current batch offset when cache is exhausted
* @param messageId
* @throws Exception
* @throws Exception
*/
void setBatch(MessageId messageId) throws Exception;
/**
* flag to indicate if the store is empty
* @return true if the message count is 0
* @throws Exception
* @throws Exception
*/
boolean isEmpty() throws Exception;
/**
* A hint to the store to try recover messages according to priority
* @param prioritizedMessages
*/
public void setPrioritizedMessages(boolean prioritizedMessages);
/**
*
*
* @return true if store is trying to recover messages according to priority
*/
public boolean isPrioritizedMessages();
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -40,96 +41,117 @@ public class ProxyMessageStore implements MessageStore {
return delegate;
}
@Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
delegate.addMessage(context, message);
}
@Override
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
delegate.addMessage(context,message,canOptimizeHint);
}
@Override
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@Override
public void recover(MessageRecoveryListener listener) throws Exception {
delegate.recover(listener);
}
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
delegate.removeAllMessages(context);
}
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeMessage(context, ack);
}
@Override
public void start() throws Exception {
delegate.start();
}
@Override
public void stop() throws Exception {
delegate.stop();
}
@Override
public void dispose(ConnectionContext context) {
delegate.dispose(context);
}
@Override
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@Override
public void setMemoryUsage(MemoryUsage memoryUsage) {
delegate.setMemoryUsage(memoryUsage);
}
@Override
public int getMessageCount() throws IOException {
return delegate.getMessageCount();
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}
@Override
public void resetBatching() {
delegate.resetBatching();
}
@Override
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
@Override
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return asyncAddTopicMessage(context,message,canOptimizeHint);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
delegate.removeAsyncMessage(context, ack);
}
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
delegate.setPrioritizedMessages(prioritizedMessages);
}
@Override
public boolean isPrioritizedMessages() {
return delegate.isPrioritizedMessages();
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -41,135 +42,165 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate;
}
@Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
delegate.addMessage(context, message);
}
@Override
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
delegate.addMessage(context,message,canOptimizeHint);
delegate.addMessage(context, message, canOptimizeHint);
}
@Override
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@Override
public void recover(MessageRecoveryListener listener) throws Exception {
delegate.recover(listener);
}
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
delegate.removeAllMessages(context);
}
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeMessage(context, ack);
}
@Override
public void start() throws Exception {
delegate.start();
}
@Override
public void stop() throws Exception {
delegate.stop();
}
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return delegate.lookupSubscription(clientId, subscriptionName);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
@Override
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
delegate.addSubsciption(subscriptionInfo, retroactive);
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
delegate.deleteSubscription(clientId, subscriptionName);
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
throws Exception {
delegate.recoverSubscription(clientId, subscriptionName, listener);
}
@Override
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
delegate.resetBatching(clientId, subscriptionName);
}
@Override
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return delegate.getAllSubscriptions();
}
@Override
public void setMemoryUsage(MemoryUsage memoryUsage) {
delegate.setMemoryUsage(memoryUsage);
}
@Override
public int getMessageCount(String clientId, String subscriberName) throws IOException {
return delegate.getMessageCount(clientId, subscriberName);
}
@Override
public int getMessageCount() throws IOException {
return delegate.getMessageCount();
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}
@Override
public void dispose(ConnectionContext context) {
delegate.dispose(context);
}
@Override
public void resetBatching() {
delegate.resetBatching();
}
@Override
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
@Override
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddTopicMessage(context,message,canOptimizeHint);
return delegate.asyncAddTopicMessage(context,message, canOptimizeHint);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
return delegate.asyncAddQueueMessage(context,message, canOptimizeHint);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
delegate.setPrioritizedMessages(prioritizedMessages);
}
@Override
public boolean isPrioritizedMessages() {
return delegate.isPrioritizedMessages();
}

View File

@ -21,7 +21,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@ -43,16 +43,16 @@ import org.slf4j.LoggerFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
*
*
*
*/
public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
private final Map transactions = new ConcurrentHashMap();
private final Map prepared;
private final KahaPersistenceAdapter adaptor;
private BrokerService brokerService;
KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
@ -67,6 +67,11 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
@ -150,19 +155,19 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message) throws IOException {
try {
if (message.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(message.getTransactionId());
tx.add((KahaMessageStore)destination, message);
} else {
destination.addMessage(null, message);
}
} catch (RuntimeStoreException rse) {
try {
if (message.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(message.getTransactionId());
tx.add((KahaMessageStore)destination, message);
} else {
destination.addMessage(null, message);
}
} catch (RuntimeStoreException rse) {
if (rse.getCause() instanceof IOException) {
brokerService.handleIOException((IOException)rse.getCause());
}
throw rse;
}
}
}
/**
@ -170,19 +175,19 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
* @throws IOException
*/
final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
try {
if (ack.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore)destination, ack);
} else {
destination.removeMessage(null, ack);
}
} catch (RuntimeStoreException rse) {
try {
if (ack.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore)destination, ack);
} else {
destination.removeMessage(null, ack);
}
} catch (RuntimeStoreException rse) {
if (rse.getCause() instanceof IOException) {
brokerService.handleIOException((IOException)rse.getCause());
}
throw rse;
}
}
}
final void acknowledge(final TopicMessageStore destination, String clientId,
@ -233,7 +238,7 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
return adaptor.retrieveMessageStore(id);
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -55,8 +56,8 @@ import org.slf4j.LoggerFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
*
*
*
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@ -119,7 +120,7 @@ public class KahaDBTransactionStore implements TransactionStore {
cmd.run();
results.add(cmd.run());
}
return results;
}
}
@ -156,11 +157,21 @@ public class KahaDBTransactionStore implements TransactionStore {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
@ -180,11 +191,21 @@ public class KahaDBTransactionStore implements TransactionStore {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);

View File

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
@ -70,11 +71,21 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
@ -89,11 +100,21 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);

View File

@ -21,7 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -40,8 +40,8 @@ import org.apache.activemq.store.TransactionStore;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
*
*
*
*/
public class MemoryTransactionStore implements TransactionStore {
@ -91,7 +91,7 @@ public class MemoryTransactionStore implements TransactionStore {
ConnectionContext ctx = new ConnectionContext();
persistenceAdapter.beginTransaction(ctx);
try {
// Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
@ -102,7 +102,7 @@ public class MemoryTransactionStore implements TransactionStore {
RemoveMessageCommand cmd = iter.next();
cmd.run(ctx);
}
} catch ( IOException e ) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
@ -110,7 +110,7 @@ public class MemoryTransactionStore implements TransactionStore {
persistenceAdapter.commitTransaction(ctx);
}
}
public interface AddMessageCommand {
Message getMessage();
@ -122,7 +122,7 @@ public class MemoryTransactionStore implements TransactionStore {
void run(ConnectionContext context) throws IOException;
}
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
this.persistenceAdapter=persistenceAdapter;
}
@ -134,20 +134,31 @@ public class MemoryTransactionStore implements TransactionStore {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
@ -159,20 +170,31 @@ public class MemoryTransactionStore implements TransactionStore {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
@ -285,7 +307,7 @@ public class MemoryTransactionStore implements TransactionStore {
destination.addMessage(null, message);
}
}
/**
* @param ack
* @throws IOException